Skip to content
Snippets Groups Projects
PCC.py 7.78 KiB
Newer Older
# -*- coding: utf-8 -*-
#
# This file is part of the PCC project
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.

""" PCC Device Server for LOFAR2.0

"""

# PyTango imports
from tango import DebugIt
from tango.server import run, command
from tango.server import device_property
Taya Snijder's avatar
Taya Snijder committed
from tango import AttrWriteType
# Additional import
from clients.opcua_connection import OPCUAConnection
Taya Snijder's avatar
Taya Snijder committed
from src.attribute_wrapper import attribute_wrapper
from src.hardware_device import hardware_device
Taya Snijder's avatar
Taya Snijder committed
from src.lofar_logging import device_logging_to_python

__all__ = ["PCC", "main"]

Taya Snijder's avatar
Taya Snijder committed
@device_logging_to_python({"device": "PCC"})
class PCC(hardware_device):
Taya Snijder's avatar
Taya Snijder committed
    **Properties:**
Taya Snijder's avatar
Taya Snijder committed
    - Device Property
        OPC_Server_Name
            - Type:'DevString'
        OPC_Server_Port
            - Type:'DevULong'
        OPC_Time_Out
            - Type:'DevDouble'
    """
Thomas Juerges's avatar
Thomas Juerges committed
    # -----------------
    # Device Properties
    # -----------------
Thomas Juerges's avatar
Thomas Juerges committed
    OPC_Server_Name = device_property(
        dtype='DevString',
        mandatory=True
    )
Thomas Juerges's avatar
Thomas Juerges committed
    OPC_Server_Port = device_property(
        dtype='DevULong',
        mandatory=True
    )
Thomas Juerges's avatar
Thomas Juerges committed
    OPC_Time_Out = device_property(
        dtype='DevDouble',
        mandatory=True
    )
    OPC_namespace = device_property(
Thomas Juerges's avatar
Thomas Juerges committed
        dtype='DevString',
        mandatory=False
    )
Thomas Juerges's avatar
Thomas Juerges committed
    # ----------
    # Attributes
    # ----------
    RCU_mask_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_mask_RW"], datatype=numpy.bool_, dims=(32,), access=AttrWriteType.READ_WRITE)
    Ant_mask_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:Ant_mask_RW"], datatype=numpy.bool_, dims=(3, 32), access=AttrWriteType.READ_WRITE)
    RCU_attenuator_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_attenuator_R"], datatype=numpy.int64, dims=(3, 32))
    RCU_attenuator_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_attenuator_RW"], datatype=numpy.int64, dims=(3, 32),
                                          access=AttrWriteType.READ_WRITE)
Thomas Juerges's avatar
Thomas Juerges committed
    RCU_band_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_band_R"], datatype=numpy.int64, dims=(3, 32))
    RCU_band_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_band_RW"], datatype=numpy.int64, dims=(3, 32), access=AttrWriteType.READ_WRITE)
    RCU_temperature_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_temperature_R"], datatype=numpy.float64, dims=(32,))
    RCU_Pwr_dig_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_Pwr_dig_R"], datatype=numpy.int64, dims=(32,))
    RCU_LED0_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_LED0_R"], datatype=numpy.int64, dims=(32,))
    RCU_LED0_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_LED0_RW"], datatype=numpy.int64, dims=(32,), access=AttrWriteType.READ_WRITE)
    RCU_ADC_lock_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_ADC_lock_R"], datatype=numpy.int64, dims=(3, 32))
    RCU_ADC_SYNC_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_ADC_SYNC_R"], datatype=numpy.int64, dims=(3, 32))
    RCU_ADC_JESD_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_ADC_JESD_R"], datatype=numpy.int64, dims=(3, 32))
    RCU_ADC_CML_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_ADC_CML_R"], datatype=numpy.int64, dims=(3, 32))
    RCU_OUT1_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_OUT1_R"], datatype=numpy.int64, dims=(3, 32))
    RCU_OUT2_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_OUT2_R"], datatype=numpy.int64, dims=(3, 32))
    RCU_ID_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_ID_R"], datatype=numpy.int64, dims=(32,))
    RCU_version_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_version_R"], datatype=numpy.str_, dims=(32,))
    HBA_element_beamformer_delays_R = attribute_wrapper(comms_annotation=["2:PCC", "2:HBA_element_beamformer_delays_R"], datatype=numpy.int64,
                                                        dims=(32, 96))
    HBA_element_beamformer_delays_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:HBA_element_beamformer_delays_RW"], datatype=numpy.int64,
                                                         dims=(32, 96), access=AttrWriteType.READ_WRITE)
Thomas Juerges's avatar
Thomas Juerges committed
    HBA_element_pwr_R = attribute_wrapper(comms_annotation=["2:PCC", "2:HBA_element_pwr_R"], datatype=numpy.int64, dims=(32, 96))
    HBA_element_pwr_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:HBA_element_pwr_RW"], datatype=numpy.int64, dims=(32, 96),
                                           access=AttrWriteType.READ_WRITE)
    RCU_monitor_rate_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_monitor_rate_RW"], datatype=numpy.float64,
                                            access=AttrWriteType.READ_WRITE)
Thomas Juerges's avatar
Thomas Juerges committed
    def delete_device(self):
        """Hook to delete resources allocated in init_device.
Taya Snijder's avatar
Taya Snijder committed
        This method allows for any memory or other resources allocated in the
        init_device method to be released.  This method is called by the device
        destructor and by the device Init command (a Tango built-in).
        """
Thomas Juerges's avatar
Thomas Juerges committed
        self.debug_stream("Shutting down...")
Thomas Juerges's avatar
Thomas Juerges committed
        self.Off()
        self.debug_stream("Shut down.  Good bye.")
Thomas Juerges's avatar
Thomas Juerges committed
    # --------
    # overloaded functions
    # --------
    def off(self):
        """ user code here. is called when the state is set to OFF """
        # Stop keep-alive
Thomas Juerges's avatar
Thomas Juerges committed
    def initialise(self):
        """ user code here. is called when the state is set to INIT """
        # Init the dict that contains function to OPC-UA function mappings.
        self.function_mapping = {}
        self.function_mapping["RCU_on"] = {}
        self.function_mapping["RCU_off"] = {}
        self.function_mapping["ADC_on"] = {}
        self.function_mapping["RCU_update"] = {}
        self.function_mapping["CLK_on"] = {}
        self.function_mapping["CLK_off"] = {}
        self.function_mapping["CLK_PLL_setup"] = {}

        # set up the OPC ua client
        self.OPCua_client = OPCUAConnection("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), "http://lofar.eu",
                                            self.OPC_Time_Out, self.Fault, self)
Thomas Juerges's avatar
Thomas Juerges committed
        # map the attributes to the OPC ua comm client
        for i in self.attr_list():
            try:
                i.set_comm_client(self.OPCua_client)
            except:
                pass
Thomas Juerges's avatar
Thomas Juerges committed
        self.OPCua_client.start()
Thomas Juerges's avatar
Thomas Juerges committed
    # --------
    # Commands
    # --------
    @command()
    @DebugIt()
    @only_when_on
    @fault_on_error
    def RCU_off(self):
        """
Taya Snijder's avatar
Taya Snijder committed
        :return:None
        """
Thomas Juerges's avatar
Thomas Juerges committed
        self.function_mapping["RCU_off"]()
Thomas Juerges's avatar
Thomas Juerges committed
    @command()
    @DebugIt()
    @only_when_on
    @fault_on_error
    def RCU_on(self):
        """
Taya Snijder's avatar
Taya Snijder committed
        :return:None
        """
Thomas Juerges's avatar
Thomas Juerges committed
        self.function_mapping["RCU_on"]()
Thomas Juerges's avatar
Thomas Juerges committed
    @command()
    @DebugIt()
    @only_when_on
    @fault_on_error
    def ADC_on(self):
        """

Taya Snijder's avatar
Taya Snijder committed
        :return:None
        """
Thomas Juerges's avatar
Thomas Juerges committed
        self.function_mapping["ADC_on"]()

    @command()
    @DebugIt()
    @only_when_on
    @fault_on_error
    def RCU_update(self):
        """

Taya Snijder's avatar
Taya Snijder committed
        :return:None
        """
Thomas Juerges's avatar
Thomas Juerges committed
        self.function_mapping["RCU_update"]()

    @command()
    @DebugIt()
    @only_when_on
    @fault_on_error
    def CLK_off(self):
        """

Taya Snijder's avatar
Taya Snijder committed
        :return:None
        """
Thomas Juerges's avatar
Thomas Juerges committed
        self.function_mapping["CLK_off"]()

    @command()
    @DebugIt()
    @only_when_on
    @fault_on_error
    def CLK_on(self):
        """

Taya Snijder's avatar
Taya Snijder committed
        :return:None
        """
Thomas Juerges's avatar
Thomas Juerges committed
        self.function_mapping["CLK_on"]()

    @command()
    @DebugIt()
    @only_when_on
    @fault_on_error
    def CLK_PLL_setup(self):
        """

Taya Snijder's avatar
Taya Snijder committed
        :return:None
        """
Thomas Juerges's avatar
Thomas Juerges committed
        self.function_mapping["CLK_PLL_setup"]()


# ----------
# Run server
# ----------
def main(args=None, **kwargs):
Thomas Juerges's avatar
Thomas Juerges committed
    """Main function of the PCC module."""
    return run((PCC,), args=args, **kwargs)


if __name__ == '__main__':
Thomas Juerges's avatar
Thomas Juerges committed
    main()