Skip to content
Snippets Groups Projects
Select Git revision
  • cdda3df46b72064723d60f2c71bcd5febfd47495
  • master default protected
  • L2SS-1957-remove-pcon-control
  • expose-prometheus
  • stabilise-landing-page
  • all-stations-lofar2
  • L2SS-2357-fix-ruff
  • control-single-hba-and-lba
  • v0.39.7-backports
  • Move-sdptr-to-v1.5.0
  • fix-build-ubuntu
  • tokens-in-env-files
  • fix-build
  • L2SS-2214-deploy-cdb
  • fix-missing-init
  • add-power-hardware-apply
  • L2SS-2129-Add-Subrack-Routine
  • Also-listen-internal-to-rpc
  • fix-build-dind
  • L2SS-2153--Improve-Error-Handling
  • L2SS-2153-Add-Grpc-Gateway-support
  • v0.52.2-rc3 protected
  • v0.52.2-rc2 protected
  • v0.52.2-rc1 protected
  • v0.52.1.1 protected
  • v0.52.1 protected
  • v0.52.1-rc1 protected
  • v0.51.9-6 protected
  • v0.51.9-5 protected
  • v0.51.9-4 protected
  • v0.51.9-3 protected
  • v0.51.9-2 protected
  • v0.51.9-1 protected
  • v0.51.9 protected
  • v0.51.8 protected
  • v0.39.15-wsrttwo protected
  • v0.39.15-wsrt protected
  • v0.39.14-wsrt protected
  • v0.51.6 protected
  • v0.51.5-1 protected
  • v0.51.5 protected
41 results

recv.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    recv.py 15.94 KiB
    # -*- coding: utf-8 -*-
    #
    # This file is part of the RECV project
    #
    #
    #
    # Distributed under the terms of the APACHE license.
    # See LICENSE.txt for more info.
    
    """ RECV Device Server for LOFAR2.0
    
    """
    
    # PyTango imports
    from tango import DebugIt
    from tango.server import command
    from tango.server import device_property, attribute
    from tango import AttrWriteType, DevVarFloatArray, DevString, DevLong
    
    import numpy
    
    # Additional import
    from tangostationcontrol.common.entrypoint import entry
    from tangostationcontrol.common.lofar_logging import device_logging_to_python
    from tangostationcontrol.clients.attribute_wrapper import attribute_wrapper
    from tangostationcontrol.devices.device_decorators import only_in_states
    from tangostationcontrol.devices.opcua_device import opcua_device
    from tangostationcontrol.devices.lofar_device import lofar_device
    
    import logging
    logger = logging.getLogger()
    
    __all__ = ["RECV", "main"]
    
    @device_logging_to_python()
    class RECV(opcua_device):
    
        FILTER_RCU_DICT = {
            "LBA_10_90": 1,
            "LBA_10_70": 1,
            "LBA_30_90": 2,
            "LBA_30_70": 2,
            "HBA_170_230": 1,
            "HBA_110_190": 2,
            "HBA_210_250": 4
        }
        
        # -----------------
        # Device Properties
        # -----------------
    
        # ----- Default settings
    
        ANT_mask_RW_default = device_property(
            dtype='DevVarBooleanArray',
            mandatory=False,
            default_value=[[True] * 3] * 32
        )
    
        RCU_mask_RW_default = device_property(
            dtype='DevVarBooleanArray',
            mandatory=False,
            default_value=[True] * 32
        )
    
        RCU_band_select_RW_default = device_property(
            dtype='DevVarLong64Array',
            mandatory=False,
            default_value=[[0] * 3] * 32
        )
    
        RCU_PWR_ANT_on_RW = device_property(
            dtype='DevVarLong64Array',
            mandatory=False,
            default_value=[[False] * 3] * 32 # turn power off by default in test setups, f.e. to prevent blowing up the noise sources
        )
    
        RECVTR_monitor_rate_RW_default = device_property(
            dtype='DevLong64',
            mandatory=False,
            default_value=1
        )
    
        TRANSLATOR_DEFAULT_SETTINGS = [
            'ANT_mask_RW',
            'RCU_mask_RW'
        ]
    
        # ----- Timing values
    
        RCU_On_Off_timeout = device_property(
            doc='Maximum amount of time to wait after turning RCU(s) on or off',
            dtype='DevFloat',
            mandatory=False,
            default_value=30.0
        )
    
        RCU_DTH_On_Off_timeout = device_property(
            doc='Maximum amount of time to wait after turning dithering on or off',
            dtype='DevFloat',
            mandatory=False,
            default_value=30.0
        )
    
        # ----- Calibration values
        
        HBAT_bf_delay_step_delays = device_property(
            dtype="DevVarFloatArray",
            mandatory=False,
            default_value=numpy.array([
                0.0,        0.5228E-9,  0.9797E-9,  1.4277E-9,  1.9055E-9,
                2.4616E-9,  2.9539E-9,  3.4016E-9,  3.8076E-9,  4.3461E-9,
                4.9876E-9,  5.4894E-9,  5.7973E-9,  6.2707E-9,  6.8628E-9,
                7.3989E-9,  8.0673E-9,  8.6188E-9,  9.1039E-9,  9.5686E-9,
                10.0463E-9, 10.5774E-9, 11.0509E-9, 11.5289E-9, 11.9374E-9,
                12.4524E-9, 13.0842E-9, 13.5936E-9, 13.9198E-9, 14.4087E-9,
                14.9781E-9, 15.5063E-9
            ],dtype=numpy.float64)
        )
    
        HBAT_signal_input_delays = device_property(
            doc='Signal input delay calibration values for the elements within a tile.',
            dtype='DevVarFloatArray',
            mandatory=False,
            default_value = numpy.zeros((32,), dtype=numpy.float64)
        )
    
        # ----------
        # Attributes
        # ----------
        ANT_mask_RW                  = attribute_wrapper(comms_annotation=["ANT_mask_RW"               ],datatype=bool         , dims=(3,32), access=AttrWriteType.READ_WRITE)
        
        # The HBAT beamformer delays represent 32 delays for each of the 96 inputs.
        # The 32 delays deconstruct as delays[polarisation][dipole], and each delay is the number of 'delay steps' to apply (0.5ns for HBAT1).
        HBAT_BF_delay_steps_R        = attribute_wrapper(comms_annotation=["HBAT_BF_delay_steps_R"     ],datatype=numpy.int64  , dims=(32,96))
        HBAT_BF_delay_steps_RW       = attribute_wrapper(comms_annotation=["HBAT_BF_delay_steps_RW"    ],datatype=numpy.int64  , dims=(32,96), access=AttrWriteType.READ_WRITE)
        HBAT_LED_on_R                = attribute_wrapper(comms_annotation=["HBAT_LED_on_R"             ],datatype=bool         , dims=(32,96))
        HBAT_LED_on_RW               = attribute_wrapper(comms_annotation=["HBAT_LED_on_RW"            ],datatype=bool         , dims=(32,96), access=AttrWriteType.READ_WRITE)
        HBAT_PWR_LNA_on_R            = attribute_wrapper(comms_annotation=["HBAT_PWR_LNA_on_R"         ],datatype=bool         , dims=(32,96))
        HBAT_PWR_LNA_on_RW           = attribute_wrapper(comms_annotation=["HBAT_PWR_LNA_on_RW"        ],datatype=bool         , dims=(32,96), access=AttrWriteType.READ_WRITE)
        HBAT_PWR_on_R                = attribute_wrapper(comms_annotation=["HBAT_PWR_on_R"             ],datatype=bool         , dims=(32,96))
        HBAT_PWR_on_RW               = attribute_wrapper(comms_annotation=["HBAT_PWR_on_RW"            ],datatype=bool         , dims=(32,96), access=AttrWriteType.READ_WRITE)
        RCU_ADC_locked_R             = attribute_wrapper(comms_annotation=["RCU_ADC_locked_R"          ],datatype=bool         , dims=(3,32))
        RCU_attenuator_dB_R          = attribute_wrapper(comms_annotation=["RCU_attenuator_dB_R"       ],datatype=numpy.int64  , dims=(3,32))
        RCU_attenuator_dB_RW         = attribute_wrapper(comms_annotation=["RCU_attenuator_dB_RW"      ],datatype=numpy.int64  , dims=(3,32), access=AttrWriteType.READ_WRITE)
        RCU_band_select_R            = attribute_wrapper(comms_annotation=["RCU_band_select_R"         ],datatype=numpy.int64  , dims=(3,32))
        RCU_band_select_RW           = attribute_wrapper(comms_annotation=["RCU_band_select_RW"        ],datatype=numpy.int64  , dims=(3,32), access=AttrWriteType.READ_WRITE)
        RCU_DTH_freq_R               = attribute_wrapper(comms_annotation=["RCU_DTH_freq_R"            ],datatype=numpy.int64  , dims=(3,32))
        RCU_DTH_freq_RW              = attribute_wrapper(comms_annotation=["RCU_DTH_freq_RW"           ],datatype=numpy.int64  , dims=(3,32), access=AttrWriteType.READ_WRITE)
        RCU_DTH_on_R                 = attribute_wrapper(comms_annotation=["RCU_DTH_on_R"              ],datatype=bool         , dims=(3,32))
        RCU_LED_green_on_R           = attribute_wrapper(comms_annotation=["RCU_LED_green_on_R"        ],datatype=bool         , dims=(32,))
        RCU_LED_green_on_RW          = attribute_wrapper(comms_annotation=["RCU_LED_green_on_RW"       ],datatype=bool         , dims=(32,), access=AttrWriteType.READ_WRITE)
        RCU_LED_red_on_R             = attribute_wrapper(comms_annotation=["RCU_LED_red_on_R"          ],datatype=bool         , dims=(32,))
        RCU_LED_red_on_RW            = attribute_wrapper(comms_annotation=["RCU_LED_red_on_RW"         ],datatype=bool         , dims=(32,), access=AttrWriteType.READ_WRITE)
        RCU_mask_RW                  = attribute_wrapper(comms_annotation=["RCU_mask_RW"               ],datatype=bool         , dims=(32,), access=AttrWriteType.READ_WRITE)
        RCU_PCB_ID_R                 = attribute_wrapper(comms_annotation=["RCU_PCB_ID_R"              ],datatype=numpy.int64  , dims=(32,))
        RCU_PCB_number_R             = attribute_wrapper(comms_annotation=["RCU_PCB_number_R"          ],datatype=str          , dims=(32,))
        RCU_PCB_version_R            = attribute_wrapper(comms_annotation=["RCU_PCB_version_R"         ],datatype=str          , dims=(32,))
        RCU_PWR_1V8_R                = attribute_wrapper(comms_annotation=["RCU_PWR_1V8_R"             ],datatype=numpy.float64, dims=(32,))
        RCU_PWR_2V5_R                = attribute_wrapper(comms_annotation=["RCU_PWR_2V5_R"             ],datatype=numpy.float64, dims=(32,))
        RCU_PWR_3V3_R                = attribute_wrapper(comms_annotation=["RCU_PWR_3V3_R"             ],datatype=numpy.float64, dims=(32,))
        RCU_PWR_ANALOG_on_R          = attribute_wrapper(comms_annotation=["RCU_PWR_ANALOG_on_R"       ],datatype=bool         , dims=(32,))
        RCU_PWR_ANT_IOUT_R           = attribute_wrapper(comms_annotation=["RCU_PWR_ANT_IOUT_R"        ],datatype=numpy.float64, dims=(3,32))
        RCU_PWR_ANT_on_R             = attribute_wrapper(comms_annotation=["RCU_PWR_ANT_on_R"          ],datatype=bool         , dims=(3,32))
        RCU_PWR_ANT_on_RW            = attribute_wrapper(comms_annotation=["RCU_PWR_ANT_on_RW"         ],datatype=bool         , dims=(3,32), access=AttrWriteType.READ_WRITE)
        RCU_PWR_ANT_VIN_R            = attribute_wrapper(comms_annotation=["RCU_PWR_ANT_VIN_R"         ],datatype=numpy.float64, dims=(3,32))
        RCU_PWR_ANT_VOUT_R           = attribute_wrapper(comms_annotation=["RCU_PWR_ANT_VOUT_R"        ],datatype=numpy.float64, dims=(3,32))
        RCU_PWR_DIGITAL_on_R         = attribute_wrapper(comms_annotation=["RCU_PWR_DIGITAL_on_R"      ],datatype=bool         , dims=(32,))
        RCU_PWR_good_R               = attribute_wrapper(comms_annotation=["RCU_PWR_good_R"            ],datatype=bool         , dims=(32,))
        RCU_TEMP_R                   = attribute_wrapper(comms_annotation=["RCU_TEMP_R"                ],datatype=numpy.float64, dims=(32,))
        RECVTR_I2C_error_R           = attribute_wrapper(comms_annotation=["RECVTR_I2C_error_R"        ],datatype=numpy.int64  , dims=(32,))
        RECVTR_monitor_rate_RW       = attribute_wrapper(comms_annotation=["RECVTR_monitor_rate_RW"    ],datatype=numpy.int64  , access=AttrWriteType.READ_WRITE)
        RECVTR_translator_busy_R     = attribute_wrapper(comms_annotation=["RECVTR_translator_busy_R"  ],datatype=bool)
    
        # ----------
        # Summarising Attributes
        # ----------
        RCU_LED_colour_R = attribute(dtype=(numpy.uint32,), max_dim_x=32, fisallowed="is_attribute_access_allowed")
    
        def read_RCU_LED_colour_R(self):
            return (2 * self.read_attribute("RCU_LED_green_on_R") + 4 * self.read_attribute("RCU_LED_red_on_R")).astype(numpy.uint32)
    
        RCU_error_R                  = attribute(dtype=(bool,), max_dim_x=32, fisallowed="is_attribute_access_allowed")
        ANT_error_R                  = attribute(dtype=((bool,),), max_dim_x=3, max_dim_y=32, fisallowed="is_attribute_access_allowed")
    
        def read_RCU_error_R(self):
            return self.read_attribute("RCU_mask_RW") & (
                     (self.read_attribute("RECVTR_I2C_error_R") > 0)
                   | self.alarm_val("RCU_PCB_ID_R")
                   )
    
        def read_ANT_error_R(self):
            return self.read_attribute("ANT_mask_RW") & (
                     ~self.read_attribute("RCU_ADC_locked_R")
                   )
    
        RECV_IOUT_error_R          = attribute(dtype=(bool,), max_dim_x=32)
        RECV_TEMP_error_R          = attribute(dtype=(bool,), max_dim_x=32, polling_period=1000)
        RECV_VOUT_error_R          = attribute(dtype=(bool,), max_dim_x=32)
    
        def read_RECV_IOUT_error_R(self):
            return (self.read_attribute("ANT_mask_RW") & (
                     self.alarm_val("RCU_PWR_ANT_IOUT_R")
                   )).any(axis=1)
    
        def read_RECV_TEMP_error_R(self):
            # Don't apply the mask here --- we always want to know if things get too hot!
            return (
                     self.alarm_val("RCU_TEMP_R")
                   )
    
        def read_RECV_VOUT_error_R(self):
            return (self.read_attribute("ANT_mask_RW") & (
                     self.alarm_val("RCU_PWR_ANT_VIN_R")
                   | self.alarm_val("RCU_PWR_ANT_VOUT_R")
                   )).any(axis=1) | (self.read_attribute("RCU_mask_RW") & (
                     self.alarm_val("RCU_PWR_1V8_R")
                   | self.alarm_val("RCU_PWR_2V5_R")
                   | self.alarm_val("RCU_PWR_3V3_R")
                   | ~self.read_attribute("RCU_PWR_DIGITAL_on_R")
                   | ~self.read_attribute("RCU_PWR_good_R")
                   ))
    
        # --------
        # overloaded functions
        # --------
        def properties_changed(self):
            super().properties_changed()
    
            # The HBAT can only apply positive delays, yet we want to apply a delay
            # relative to the center of the tile, which typically requires negative
            # delays for half of the elements.
            #
            # We circumvent this issue by increasing the delays for all elements
            # by a fixed amount, the average of all steps. Doing so should result
            # in positive delays regardless of the pointing direction.
            self.HBAT_bf_delay_offset = numpy.mean(self.HBAT_bf_delay_step_delays)
        
        def _initialise_hardware(self):
            """ Initialise the RCU hardware. """
    
            # Cycle RCUs
            self.RCU_off()
            self.wait_attribute("RECVTR_translator_busy_R", False, self.RCU_On_Off_timeout)
            self.RCU_on()
            self.wait_attribute("RECVTR_translator_busy_R", False, self.RCU_On_Off_timeout)
        
        def _disable_hardware(self):
            """ Disable the RECV hardware. """
    
            # Save actual mask values
            RCU_mask = self.proxy.RCU_mask_RW
            # Set the mask to all Trues
            self.RCU_mask_RW = [True] * 32
            # Turn off the RCUs
            self.RCU_off()
            self.wait_attribute("RECVTR_translator_busy_R", False, self.RCU_On_Off_timeout)
            # Restore the mask
            self.RCU_mask_RW = RCU_mask
    
        # --------
        # internal functions
        # --------
        def _calculate_HBAT_bf_delay_steps(self, delays: numpy.ndarray):
            """
            Helper function that converts a signal path delay (in seconds) to an analog beam weight,
            which is a value per tile per dipole per polarisation.
            """
            # Duplicate delay values per polarisation
            polarised_delays = numpy.tile(delays, 2)                      # output dims -> 96x32
    
            # Add signal input delay
            calibrated_delays = numpy.add(polarised_delays, self.HBAT_signal_input_delays)
    
            # Find the right delay step by looking for the closest match in property RECV-> HBAT_bf_delay_step_delays
            def nearest_delay_step(delay):
                # We want the index in the HBAT_bf_delay_steps_delay array which is closest to the given delay,
                # shifted by HBAT_bf_delay_offset to obtain strictly positive delays.
                return (numpy.abs(self.HBAT_bf_delay_step_delays - (delay + self.HBAT_bf_delay_offset))).argmin()
    
            # Apply to all elements to convert each delay into the number of delay steps
            return numpy.vectorize(nearest_delay_step)(calibrated_delays)
    
        # --------
        # Commands
        # --------
        @command(dtype_in=DevVarFloatArray, dtype_out=DevVarFloatArray)
        def calculate_HBAT_bf_delay_steps(self, delays: numpy.ndarray):
            """ converts a signal path delay (in seconds) to an analog beam weight """
            
            # Reshape the flatten input array, into whatever how many tiles we get
            delays = numpy.array(delays).reshape(-1,16)
            
            # Calculate the beam weight array
            HBAT_bf_delay_steps = self._calculate_HBAT_bf_delay_steps(delays)
            
            return HBAT_bf_delay_steps.flatten()
        
        @command(dtype_in=DevString, dtype_out=DevLong)
        def get_rcu_band_from_filter(self, filter_name: str):
            """ return the rcu band given the filter name"""
            return self.FILTER_RCU_DICT.get(filter_name, -1)
    
        @command()
        @DebugIt()
        @only_in_states(lofar_device.DEFAULT_COMMAND_STATES)
        def RCU_off(self):
            """
    
            :return:None
            """
            self.opcua_connection.call_method(["RCU_off"])
    
        @command()
        @DebugIt()
        @only_in_states(lofar_device.DEFAULT_COMMAND_STATES)
        def RCU_on(self):
            """
    
            :return:None
            """
            self.opcua_connection.call_method(["RCU_on"])
    
        @command()
        @DebugIt()
        @only_in_states(lofar_device.DEFAULT_COMMAND_STATES)
        def RCU_DTH_off(self):
            """
    
            :return:None
            """
            self.opcua_connection.call_method(["RCU_DTH_off"])
    
        @command()
        @DebugIt()
        @only_in_states(lofar_device.DEFAULT_COMMAND_STATES)
        def RCU_DTH_on(self):
            """
    
            :return:None
            """
            self.opcua_connection.call_method(["RCU_DTH_on"])
    
    # ----------
    # Run server
    # ----------
    def main(**kwargs):
        """Main function of the RECV module."""
        return entry(RECV, **kwargs)