Skip to content
Snippets Groups Projects
Select Git revision
  • 04bcebcaa902f192fd9bf385ec2b16325e783acc
  • master default protected
  • Work_Gijs protected
3 results

set_rcu2_via_PCC_version.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    PCC.py 7.82 KiB
    # -*- coding: utf-8 -*-
    #
    # This file is part of the PCC project
    #
    #
    #
    # Distributed under the terms of the APACHE license.
    # See LICENSE.txt for more info.
    
    """ PCC Device Server for LOFAR2.0
    
    """
    
    # PyTango imports
    from tango import DebugIt
    from tango.server import run, command
    from tango.server import device_property
    from tango import AttrWriteType
    import numpy
    # Additional import
    
    from src.wrappers import *
    
    from clients.opcua_connection import OPCUAConnection
    from src.attribute_wrapper import attribute_wrapper
    from src.hardware_device import hardware_device
    from src.lofar_logging import device_logging_to_python
    
    __all__ = ["PCC", "main"]
    
    @device_logging_to_python({"device": "PCC"})
    class PCC(hardware_device):
        """
    
        **Properties:**
    
        - Device Property
            OPC_Server_Name
                - Type:'DevString'
            OPC_Server_Port
                - Type:'DevULong'
            OPC_Time_Out
                - Type:'DevDouble'
        """
    
        # -----------------
        # Device Properties
        # -----------------
    
        OPC_Server_Name = device_property(
            dtype='DevString',
            mandatory=True
        )
    
        OPC_Server_Port = device_property(
            dtype='DevULong',
            mandatory=True
        )
    
        OPC_Time_Out = device_property(
            dtype='DevDouble',
            mandatory=True
        )
        OPC_namespace = device_property(
            dtype='DevString',
            mandatory=False
        )
    
        # ----------
        # Attributes
        # ----------
        RCU_mask_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_mask_RW"], datatype=numpy.bool_, dims=(32,), access=AttrWriteType.READ_WRITE)
        Ant_mask_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:Ant_mask_RW"], datatype=numpy.bool_, dims=(3, 32), access=AttrWriteType.READ_WRITE)
        RCU_attenuator_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_attenuator_R"], datatype=numpy.int64, dims=(3, 32))
        RCU_attenuator_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_attenuator_RW"], datatype=numpy.int64, dims=(3, 32),
                                              access=AttrWriteType.READ_WRITE)
        RCU_band_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_band_R"], datatype=numpy.int64, dims=(3, 32))
        RCU_band_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_band_RW"], datatype=numpy.int64, dims=(3, 32), access=AttrWriteType.READ_WRITE)
        RCU_temperature_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_temperature_R"], datatype=numpy.float64, dims=(32,))
        RCU_Pwr_dig_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_Pwr_dig_R"], datatype=numpy.int64, dims=(32,))
        RCU_LED0_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_LED0_R"], datatype=numpy.int64, dims=(32,))
        RCU_LED0_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_LED0_RW"], datatype=numpy.int64, dims=(32,), access=AttrWriteType.READ_WRITE)
        RCU_ADC_lock_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_ADC_lock_R"], datatype=numpy.int64, dims=(3, 32))
        RCU_ADC_SYNC_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_ADC_SYNC_R"], datatype=numpy.int64, dims=(3, 32))
        RCU_ADC_JESD_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_ADC_JESD_R"], datatype=numpy.int64, dims=(3, 32))
        RCU_ADC_CML_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_ADC_CML_R"], datatype=numpy.int64, dims=(3, 32))
        RCU_OUT1_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_OUT1_R"], datatype=numpy.int64, dims=(3, 32))
        RCU_OUT2_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_OUT2_R"], datatype=numpy.int64, dims=(3, 32))
        RCU_ID_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_ID_R"], datatype=numpy.int64, dims=(32,))
        RCU_version_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_version_R"], datatype=numpy.str_, dims=(32,))
    
        HBA_element_beamformer_delays_R = attribute_wrapper(comms_annotation=["2:PCC", "2:HBA_element_beamformer_delays_R"], datatype=numpy.int64,
                                                            dims=(32, 96))
        HBA_element_beamformer_delays_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:HBA_element_beamformer_delays_RW"], datatype=numpy.int64,
                                                             dims=(32, 96), access=AttrWriteType.READ_WRITE)
        HBA_element_pwr_R = attribute_wrapper(comms_annotation=["2:PCC", "2:HBA_element_pwr_R"], datatype=numpy.int64, dims=(32, 96))
        HBA_element_pwr_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:HBA_element_pwr_RW"], datatype=numpy.int64, dims=(32, 96),
                                               access=AttrWriteType.READ_WRITE)
    
        RCU_monitor_rate_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_monitor_rate_RW"], datatype=numpy.float64,
                                                access=AttrWriteType.READ_WRITE)
    
        def delete_device(self):
            """Hook to delete resources allocated in init_device.
    
            This method allows for any memory or other resources allocated in the
            init_device method to be released.  This method is called by the device
            destructor and by the device Init command (a Tango built-in).
            """
            self.debug_stream("Shutting down...")
    
            self.Off()
            self.debug_stream("Shut down.  Good bye.")
    
        # --------
        # overloaded functions
        # --------
        def off(self):
            """ user code here. is called when the state is set to OFF """
            # Stop keep-alive
            self.OPCua_client.stop()
    
        def initialise(self):
            """ user code here. is called when the state is set to INIT """
    
            # Init the dict that contains function to OPC-UA function mappings.
            self.function_mapping = {}
            self.function_mapping["RCU_on"] = {}
            self.function_mapping["RCU_off"] = {}
            self.function_mapping["ADC_on"] = {}
            self.function_mapping["RCU_update"] = {}
            self.function_mapping["CLK_on"] = {}
            self.function_mapping["CLK_off"] = {}
            self.function_mapping["CLK_PLL_setup"] = {}
    
            # set up the OPC ua client
            self.OPCua_client = OPCUAConnection("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), "http://lofar.eu",
                                                self.OPC_Time_Out, self.Fault, self)
    
            # map the attributes to the OPC ua comm client
            for i in self.attr_list():
                try:
                    i.set_comm_client(self.OPCua_client)
                except:
                    pass
    
            self.OPCua_client.start()
    
        # --------
        # Commands
        # --------
        @command()
        @DebugIt()
        @only_when_on
        @fault_on_error
        def RCU_off(self):
            """
    
            :return:None
            """
            self.function_mapping["RCU_off"]()
    
        @command()
        @DebugIt()
        @only_when_on
        @fault_on_error
        def RCU_on(self):
            """
    
            :return:None
            """
            self.function_mapping["RCU_on"]()
    
        @command()
        @DebugIt()
        @only_when_on
        @fault_on_error
        def ADC_on(self):
            """
    
            :return:None
            """
            self.function_mapping["ADC_on"]()
    
        @command()
        @DebugIt()
        @only_when_on
        @fault_on_error
        def RCU_update(self):
            """
    
            :return:None
            """
            self.function_mapping["RCU_update"]()
    
        @command()
        @DebugIt()
        @only_when_on
        @fault_on_error
        def CLK_off(self):
            """
    
            :return:None
            """
            self.function_mapping["CLK_off"]()
    
        @command()
        @DebugIt()
        @only_when_on
        @fault_on_error
        def CLK_on(self):
            """
    
            :return:None
            """
            self.function_mapping["CLK_on"]()
    
        @command()
        @DebugIt()
        @only_when_on
        @fault_on_error
        def CLK_PLL_setup(self):
            """
    
            :return:None
            """
            self.function_mapping["CLK_PLL_setup"]()
    
    
    # ----------
    # Run server
    # ----------
    def main(args=None, **kwargs):
        """Main function of the PCC module."""
        return run((PCC,), args=args, **kwargs)
    
    
    if __name__ == '__main__':
        main()