Skip to content
Snippets Groups Projects
recv_device.py 13.6 KiB
Newer Older
#  Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy)
#  SPDX-License-Identifier: Apache-2.0
""" RECV Abstract Device Server for LOFAR2.0
from enum import IntEnum
from attribute_wrapper.attribute_wrapper import AttributeWrapper
from tango import AttrWriteType, DevString, DevLong
# PyTango imports
from tango import DebugIt
from tango.server import command, device_property, attribute
from tangostationcontrol.common.constants import (
    N_rcu,
    N_rcu_inp,
)
from tangostationcontrol.common.frequency_bands import bands
from tangostationcontrol.common.lofar_logging import device_logging_to_python
from tangostationcontrol.common.states import DEFAULT_COMMAND_STATES
from tangostationcontrol.devices.base_device_classes.opcua_device import OPCUADevice
from tangostationcontrol.common.device_decorators import only_in_states
from tangostationcontrol.metrics import device_metrics
logger = logging.getLogger()

__all__ = ["RECVDevice"]
class RCUType(IntEnum):
    Two types of receiver: RECVL and RECVH
Jan David Mol's avatar
Jan David Mol committed
    ],
    include=[
        "ANT_mask_RW",
        "RCU_mask_RW",
    ],
class RECVDevice(OPCUADevice):
    """
    RECV Device is an abstract Device which serves as a common base
    for RECVL and RECVH devices, containing all the common points and
    the shared logic of these two devices.
    """

Thomas Juerges's avatar
Thomas Juerges committed
    # -----------------
    # Device Properties
    # -----------------
        mandatory=False,
    RECVTR_monitor_rate_RW_default = device_property(
        dtype="DevLong64", mandatory=False, default_value=1
    )

    RCU_mask_RW_default = device_property(
        dtype="DevVarBooleanArray", mandatory=False, default_value=[True] * N_rcu
    RCU_attenuator_dB_RW_default = device_property(
    RCU_PWR_ANT_on_RW_default = device_property(
Jan David Mol's avatar
Jan David Mol committed
        default_value=[False] * N_rcu * N_rcu_inp,
        # turn power off by default in test setups, f.e. to prevent blowing up the noise sources
    TRANSLATOR_DEFAULT_SETTINGS = [
        "ANT_mask_RW",
        "RCU_mask_RW",
        "RECVTR_monitor_rate_RW",
    # ----- Timing values

    RCU_On_Off_timeout = device_property(
        doc="Maximum amount of time to wait after turning RCU(s) on or off",
        dtype="DevFloat",
        default_value=120.0,
        doc="Maximum amount of time to wait after turning dithering on or off",
        dtype="DevFloat",
Thomas Juerges's avatar
Thomas Juerges committed
    # ----------
    # Attributes
    # ----------
    TR_software_version_R = AttributeWrapper(
        comms_annotation=["TR_software_version_R"], datatype=str
    )

    ANT_mask_RW = AttributeWrapper(
        doc="Which antennas are physically connected.",
        comms_annotation=["ANT_mask_RW"],
        datatype=bool,
        dims=(N_rcu, N_rcu_inp),
        access=AttrWriteType.READ_WRITE,
    RECVTR_monitor_rate_RW = AttributeWrapper(
        comms_annotation=["RECVTR_monitor_rate_RW"],
        datatype=numpy.int64,
        access=AttrWriteType.READ_WRITE,

    RCU_mask_RW = AttributeWrapper(
        comms_annotation=["RCU_mask_RW"],
        dims=(N_rcu,),

    RECVTR_I2C_error_R = AttributeWrapper(
        comms_annotation=["RECVTR_I2C_error_R"], datatype=numpy.int64, dims=(N_rcu,)

    RECVTR_translator_busy_R = AttributeWrapper(
        comms_annotation=["RECVTR_translator_busy_R"], datatype=bool
    RCU_attenuator_dB_R = AttributeWrapper(
        comms_annotation=["RCU_attenuator_dB_R"],
        datatype=numpy.int64,
        dims=(N_rcu, N_rcu_inp),
    )
    RCU_attenuator_dB_RW = AttributeWrapper(
        comms_annotation=["RCU_attenuator_dB_RW"],
        datatype=numpy.int64,
        dims=(N_rcu, N_rcu_inp),
        access=AttrWriteType.READ_WRITE,
    RCU_band_select_R = AttributeWrapper(
        comms_annotation=["RCU_band_select_R"],
        datatype=numpy.int64,
        dims=(N_rcu, N_rcu_inp),
    )
    RCU_band_select_RW = AttributeWrapper(
        comms_annotation=["RCU_band_select_RW"],
        datatype=numpy.int64,
        dims=(N_rcu, N_rcu_inp),
        access=AttrWriteType.READ_WRITE,
    RCU_LED_red_on_R = AttributeWrapper(
        comms_annotation=["RCU_LED_red_on_R"], datatype=bool, dims=(N_rcu,)
    )
    RCU_LED_red_on_RW = AttributeWrapper(
        comms_annotation=["RCU_LED_red_on_RW"],
        datatype=bool,
        dims=(N_rcu,),
        access=AttrWriteType.READ_WRITE,

    RCU_LED_green_on_R = AttributeWrapper(
        comms_annotation=["RCU_LED_green_on_R"], datatype=bool, dims=(N_rcu,)
    )
    RCU_LED_green_on_RW = AttributeWrapper(
        comms_annotation=["RCU_LED_green_on_RW"],
        datatype=bool,
        dims=(N_rcu,),
        access=AttrWriteType.READ_WRITE,
    )

    RCU_TEMP_R = AttributeWrapper(
        comms_annotation=["RCU_TEMP_R"], datatype=numpy.float64, dims=(N_rcu,)

    RCU_PWR_3V3_R = AttributeWrapper(
        comms_annotation=["RCU_PWR_3V3_R"], datatype=numpy.float64, dims=(N_rcu,)
    RCU_PWR_1V8_R = AttributeWrapper(
        comms_annotation=["RCU_PWR_1V8_R"], datatype=numpy.float64, dims=(N_rcu,)
    )
    RCU_PWR_2V5_R = AttributeWrapper(
        comms_annotation=["RCU_PWR_2V5_R"], datatype=numpy.float64, dims=(N_rcu,)
    )

    RCU_PWR_ANT_VOUT_R = AttributeWrapper(
        comms_annotation=["RCU_PWR_ANT_VOUT_R"],
        datatype=numpy.float64,
        dims=(N_rcu, N_rcu_inp),

    RCU_PWR_ANT_VIN_R = AttributeWrapper(
        comms_annotation=["RCU_PWR_ANT_VIN_R"],
        datatype=numpy.float64,
        dims=(N_rcu, N_rcu_inp),
    RCU_PWR_ANT_IOUT_R = AttributeWrapper(
        comms_annotation=["RCU_PWR_ANT_IOUT_R"],
        datatype=numpy.float64,
        dims=(N_rcu, N_rcu_inp),
    )

    RCU_PWR_DIGITAL_on_R = AttributeWrapper(
        comms_annotation=["RCU_PWR_DIGITAL_on_R"], datatype=bool, dims=(N_rcu,)
    )

    RCU_PWR_good_R = AttributeWrapper(
        comms_annotation=["RCU_PWR_good_R"], datatype=bool, dims=(N_rcu,)
    )

    RCU_PWR_ANALOG_on_R = AttributeWrapper(
        comms_annotation=["RCU_PWR_ANALOG_on_R"], datatype=bool, dims=(N_rcu,)
    )

    RCU_PWR_ANT_on_R = AttributeWrapper(
        comms_annotation=["RCU_PWR_ANT_on_R"], datatype=bool, dims=(N_rcu, N_rcu_inp)
    )
    RCU_PWR_ANT_on_RW = AttributeWrapper(
        comms_annotation=["RCU_PWR_ANT_on_RW"],
        datatype=bool,
        dims=(N_rcu, N_rcu_inp),
        access=AttrWriteType.READ_WRITE,
    )

    RCU_PCB_ID_R = AttributeWrapper(
        comms_annotation=["RCU_PCB_ID_R"], datatype=numpy.int64, dims=(N_rcu,)

    RCU_PCB_version_R = AttributeWrapper(
        comms_annotation=["RCU_PCB_version_R"], datatype=str, dims=(N_rcu,)

    RCU_PCB_number_R = AttributeWrapper(
        comms_annotation=["RCU_PCB_number_R"], datatype=str, dims=(N_rcu,)

    RCU_ADC_locked_R = AttributeWrapper(
        comms_annotation=["RCU_ADC_locked_R"], datatype=bool, dims=(N_rcu, N_rcu_inp)

    RCU_DTH_freq_R = AttributeWrapper(
        comms_annotation=["RCU_DTH_freq_R"],
        datatype=numpy.int64,
        dims=(N_rcu, N_rcu_inp),
    RCU_DTH_freq_RW = AttributeWrapper(
        comms_annotation=["RCU_DTH_freq_RW"],
        dims=(N_rcu, N_rcu_inp),
    RCU_DTH_PWR_R = AttributeWrapper(
        comms_annotation=["RCU_DTH_PWR_R"],
        datatype=numpy.float64,
        doc="RCU Dither source power (dBm). Range -25 to -4.",
    RCU_DTH_PWR_RW = AttributeWrapper(
        comms_annotation=["RCU_DTH_PWR_RW"],
        datatype=numpy.float64,
        dims=(N_rcu, N_rcu_inp),
        doc="RCU Dither source power (dBm). Range -25 to -4.",
    )

    RCU_DTH_on_R = AttributeWrapper(
        comms_annotation=["RCU_DTH_on_R"], datatype=bool, dims=(N_rcu, N_rcu_inp)
    RCU_LED_colour_R = attribute(
        dtype=(numpy.uint32,), max_dim_x=N_rcu, fisallowed="is_attribute_access_allowed"
    )
    ANT_error_R = attribute(
        dtype=((bool,),),
        max_dim_y=N_rcu,
        max_dim_x=N_rcu_inp,
        fisallowed="is_attribute_access_allowed",
    )
    RCU_error_R = attribute(
        dtype=(bool,), max_dim_x=N_rcu, fisallowed="is_attribute_access_allowed"
    )
    RECV_IOUT_error_R = attribute(
        dtype=((bool,),),
        max_dim_y=N_rcu,
        max_dim_x=N_rcu_inp,
        fisallowed="is_attribute_access_allowed",
    )
    RECV_TEMP_error_R = attribute(
        dtype=(bool,),
        max_dim_x=N_rcu,
        fisallowed="is_attribute_access_allowed",
    )
    RECV_VOUT_error_R = attribute(
        dtype=(bool,), max_dim_x=N_rcu, fisallowed="is_attribute_access_allowed"
    )
    def read_RCU_LED_colour_R(self):
        return (
            2 * self.read_attribute("RCU_LED_green_on_R")
            + 4 * self.read_attribute("RCU_LED_red_on_R")
        ).astype(numpy.uint32)
        # Don't apply the mask here --- we always want to know if things get too hot!
    def read_ANT_error_R(self):
        return self.read_attribute("ANT_mask_RW") & (
            ~self.read_attribute("RCU_ADC_locked_R")
        )

    def read_RCU_error_R(self):
        return self.read_attribute("RCU_mask_RW") & (
            (self.read_attribute("RECVTR_I2C_error_R") > 0)
            | self.alarm_val("RCU_PCB_ID_R")
        )

    def read_RECV_IOUT_error_R(self):
        return self.read_attribute("ANT_mask_RW") & (
            self.alarm_val("RCU_PWR_ANT_IOUT_R")
        )

    def read_RECV_VOUT_error_R(self):
                self.alarm_val("RCU_PWR_ANT_VIN_R")
                | self.alarm_val("RCU_PWR_ANT_VOUT_R")
                self.alarm_val("RCU_PWR_1V8_R")
                | self.alarm_val("RCU_PWR_2V5_R")
                | self.alarm_val("RCU_PWR_3V3_R")
                | ~self.read_attribute("RCU_PWR_DIGITAL_on_R")
                | ~self.read_attribute("RCU_PWR_good_R")
Thomas Juerges's avatar
Thomas Juerges committed
    # --------
    # overloaded functions
    # --------
    def _read_hardware_powered_fraction_R(self):
        """Read attribute which monitors the power"""

        mask = self.read_attribute("RCU_mask_RW")
        powered = self.read_attribute("RCU_PWR_good_R")

        try:
            return numpy.count_nonzero(powered & mask) / numpy.count_nonzero(mask)
        except ZeroDivisionError:
            return 1.0

    def _power_hardware_on(self):
        """Power the RCUs."""

        self.RCU_on()
        self.wait_attribute("RECVTR_translator_busy_R", False, self.RCU_On_Off_timeout)

        # NB: Powering on RCUs causes DTH to be turned off, which is what we want
        #     to create a steady baseline after powerup. This is done by RECVTR
        #     even if the RCUs are already powered on.
        self.RCU_DTH_setup()

        # Let dithering be configured in the background since other setup
        # does not depend on it.
        # (do not wait for RECVTR_translator_busy_R == False)
    def _power_hardware_off(self):
        """Turns off the RCUs."""
        # Save actual mask values
Stefano Di Frischia's avatar
Stefano Di Frischia committed
        RCU_mask = self.proxy.RCU_mask_RW
        # Set the mask to all Trues
        self.RCU_mask_RW = [True] * N_rcu
        # Turn off the RCUs
        self.RCU_off()
        self.wait_attribute("RECVTR_translator_busy_R", False, self.RCU_On_Off_timeout)
        # Restore the mask
        self.RCU_mask_RW = RCU_mask
Thomas Juerges's avatar
Thomas Juerges committed
    # --------
    # Commands
    # --------
Thomas Juerges's avatar
Thomas Juerges committed
    @command()
    @DebugIt()
    @only_in_states(DEFAULT_COMMAND_STATES)
Thomas Juerges's avatar
Thomas Juerges committed
    def RCU_off(self):
        """
Thomas Juerges's avatar
Thomas Juerges committed
        :return:None
        """
        self.opcua_connection.call_method(["RCU_off"])
Thomas Juerges's avatar
Thomas Juerges committed
    @command()
    @DebugIt()
    @only_in_states(DEFAULT_COMMAND_STATES)
Thomas Juerges's avatar
Thomas Juerges committed
    def RCU_on(self):
        """
Thomas Juerges's avatar
Thomas Juerges committed
        :return:None
        """
        self.opcua_connection.call_method(["RCU_on"])
Thomas Juerges's avatar
Thomas Juerges committed
    @command()
    @DebugIt()
    @only_in_states(DEFAULT_COMMAND_STATES)
Thomas Juerges's avatar
Thomas Juerges committed
        """

        :return:None
        """
        self.opcua_connection.call_method(["RCU_DTH_off"])
Thomas Juerges's avatar
Thomas Juerges committed

    @command()
    @DebugIt()
    @only_in_states(DEFAULT_COMMAND_STATES)
Thomas Juerges's avatar
Thomas Juerges committed
        """

        :return:None
        """
        self.opcua_connection.call_method(["RCU_DTH_on"])
    @command()
    @DebugIt()
    @only_in_states(DEFAULT_COMMAND_STATES)
    def RCU_DTH_setup(self):
        """
        Configure dither power (RCU_DTH_PWR) and unique frequency (RCU_DTH_freq)
        for each signal chain

        :return:None
        """
        self.opcua_connection.call_method(["RCU_DTH_setup"])

    @command(dtype_in=DevString, dtype_out=DevLong)
    def get_rcu_band_from_filter(self, filter_name: str):
        """return the rcu band given the filter name"""
        return bands[filter_name].rcu_band