Skip to content
Snippets Groups Projects
Select Git revision
  • c5e24a6b830e3e21d12121ee8caddd067510cb92
  • master default protected
  • st-1502-pytango-9.4.0
  • st-1426-test-tango-9_4_0
  • low-359-named-pvc
  • st-1381-base-image-to-ubuntu
  • bang-26
  • low-331-alpine-poetry-image
  • st-1260-delete-depencies
  • at6-1082-mount-host-path
  • st-974-probes-for-tangodb
  • test-pipeline
  • st-966-changelog-template-job
  • st-915-pytango
  • st-933-publish-raw-packages
  • sar-313-exploratory-work
  • ST-758
  • at6-700-image
  • st-581-car
  • AT1-709-configure-archiver-script
  • st-565
  • 0.4.3
  • 0.4.2
  • 0.4.1
  • 0.4.0
  • 0.3.26
  • 0.3.25
  • 0.3.24
  • 0.3.23
  • 0.3.22
  • 0.3.21
  • 0.3.20
  • 0.3.19
  • 0.3.18
  • 0.3.17
  • 0.3.16
  • 0.3.15
  • 0.3.14
  • 0.3.13
  • 0.3.12
  • 0.3.11
41 results

tango_tools.feature

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    antennafield_device.py 35.27 KiB
    #  Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy)
    #  SPDX-License-Identifier: Apache-2.0
    
    """AntennaField Abstract Device Server for LOFAR2.0"""
    
    import logging
    import datetime
    from enum import IntEnum
    from typing import List, Dict
    
    import numpy
    
    # PyTango imports
    from tango import (
        AttrWriteType,
        DevVarBooleanArray,
        DebugIt,
    )
    from tango.server import device_property, attribute, command
    
    # Additional import
    from tangostationcontrol.beam.geo import ETRS_to_ITRF
    from tangostationcontrol.beam.geo import GEO_to_GEOHASH
    from tangostationcontrol.beam.geo import ITRF_to_GEO
    from tangostationcontrol.common.antennas import antenna_set_to_mask
    from tangostationcontrol.common.cables import cable_types
    from tangostationcontrol.common.constants import (
        MAX_ANTENNA,
        N_pol,
        N_xyz,
        N_latlong,
        N_pn,
        A_pn,
        N_ANTENNA_SETS,
    )
    from tangostationcontrol.common.device_decorators import only_in_states, log_exceptions
    from tangostationcontrol.common.frequency_bands import bands, Band
    from tangostationcontrol.common.lofar_logging import (
        device_logging_to_python,
    )
    from tangostationcontrol.common.proxies.proxy import create_device_proxy
    from tangostationcontrol.common.states import DEFAULT_COMMAND_STATES
    from tangostationcontrol.devices.base_device_classes.lofar_device import LOFARDevice
    from tangostationcontrol.devices.base_device_classes.mapper import (
        MappedAttribute,
        MappingKeys,
        AntennaToSdpMapper,
        RecvDeviceWalker,
    )
    from tangostationcontrol.devices.types import DeviceTypes
    from tangostationcontrol.metrics import device_metrics
    
    logger = logging.getLogger()
    
    __all__ = ["AF", "AntennaUse", "AntennaStatus"]
    
    
    class AntennaUse(IntEnum):
        """Enum representing the use for each antenna"""
    
        AUTO = 0  # use antenna only if it's OK or SUSPICIOUS
        ON = 1  # force antenna to be on, regardless of status
        OFF = 2  # force antenna to be off, regardless of status
    
    
    class AntennaStatus(IntEnum):
        """Enum representing the status of each antenna"""
    
        OK = 0
        SUSPICIOUS = 1
        BROKEN = 2
        BEYOND_REPAIR = 3
        NOT_AVAILABLE = 4
    
    
    @device_logging_to_python()
    @device_metrics(
        exclude=[
            "FPGA_sdp_info_*",
            "*_RW",
        ],
        include=[
            "ANT_mask_RW",
            "Frequency_Band_RW",
        ],
    )
    class AF(LOFARDevice):
        """
        AntennaField Device is an abstract Device which serves as a common base
        for HBA and LBA devices, containing all the common points and
        the shared logic of these two devices.
    
        Manages the antennas in a single antenna field, by acting as a
        a mapping onto one or more RECV devices.
    
        The antenna field models a number of antennas, each of which
        carries:
          * a position in Antenna_Reference_ETRS/ITRF,
          * a power mapping onto an RCU in Power_to_RECV_mapping,
          * a control mapping onto an RCU in Control_to_RECV_mapping.
    
        Furthermore, a central field reference position for the
        antenna field is maintained in Antenna_Field_Reference_ETRS/ITRF.
    
        For each position, it is best to provide the ETRS [x,y,z] position,
        although this can be overruled through supplying an ITRF [x,y,z] position
        instead. If not, the ITRF position is calculated by extrapolating the
        tectonic shifts from the ETRS position, into the frame and epoch as
        configured in ITRF_Reference_Frame and ITRF_Reference_Epoch.
    
        From the ITRF positions, the geographical [lat,long] positions are
        calculated, as well as the geohash.
        """
    
        @classmethod
        def get_mapped_attributes(cls, device_name) -> List[str]:
            """Return a list of the mapped attributes"""
            # collect all attributes for which defaults are provided
            result = []
            mapped_attrs = [
                name for name in dir(cls) if isinstance(getattr(cls, name), MappedAttribute)
            ]
            for attr in mapped_attrs:
                attr_instance = getattr(cls, attr)
                # check the variable 'mapping_device'
                if attr_instance.mapping_device == device_name:
                    result.append(attr)
            return result
    
        @classmethod
        def get_mapped_dimensions(cls, device_name) -> Dict[str, tuple]:
            """Return a dictionary of the mapped attribute dimensions"""
            mapped_dims = {}
            for attr in cls.get_mapped_attributes(device_name):
                attr_instance = getattr(cls, attr)
                dim_x, dim_y = attr_instance.dim_x, attr_instance.dim_y
                # Insert the dimension(s) in the map
                mapped_dims[attr] = (dim_x,) if dim_y == 0 else (dim_y, dim_x)
            return mapped_dims
    
        # ----- Antenna names
    
        Antenna_Names = device_property(
            doc="Name of each antenna",
            dtype="DevVarStringArray",
            mandatory=False,
            default_value=[f"Antenna{n + 1}" for n in range(MAX_ANTENNA)],
        )
    
        # ----- Antenna set
    
        Antenna_Sets = device_property(
            doc="String representation of officially offered set of antennas, "
            "for use in digital beamforming.",
            dtype="DevVarStringArray",
            mandatory=False,
            default_value=["ALL"],
        )
    
        Antenna_Set_Masks = device_property(
            doc="String encoding of the corresponding antenna masks for the antennafield, "
            "for use in digital beamforming. Excludes calibration antennas.",
            dtype="DevVarStringArray",
            mandatory=False,
            default_value=["1" * MAX_ANTENNA],
        )
    
        # ----- Antenna states
    
        Antenna_Status = device_property(
            doc="Operational status state of each antenna",
            dtype="DevVarUShortArray",
            mandatory=False,
            default_value=numpy.array(
                [AntennaStatus.OK] * MAX_ANTENNA, dtype=AntennaStatus
            ),
        )
    
        Antenna_Use = device_property(
            doc="Operational State of each antenna",
            dtype="DevVarUShortArray",
            mandatory=False,
            default_value=numpy.array([AntennaUse.AUTO] * MAX_ANTENNA, dtype=AntennaUse),
        )
    
        # ----- Antenna properties
    
        Antenna_Needs_Power = device_property(
            doc="Whether to provide power to each antenna (False for noise sources)",
            dtype="DevVarBooleanArray",
            mandatory=False,
            default_value=numpy.array([True] * MAX_ANTENNA),
        )
    
        Antenna_Cables = device_property(
            doc=f"Which cables connect each antenna to the RCU. Both polarisations are "
            f"assumed to be connected using the same type of cable. Needs to be "
            f"any of ({', '.join(cable_types.keys())}).",
            dtype="DevVarStringArray",
            mandatory=False,
            default_value=numpy.array(["0m"] * MAX_ANTENNA),
        )
    
        Field_Attenuation = device_property(
            doc="Attenuation value to apply on all inputs.",
            dtype="DevShort",
            mandatory=False,
            default_value=10,
        )
    
        # ----- Position information
    
        Antenna_Field_Reference_ITRF = device_property(
            doc="ITRF position (XYZ) of each antenna field (leave empty to auto-derive "
            "from ETRS)",
            dtype="DevVarFloatArray",
            mandatory=False,
        )
    
        Antenna_Field_Reference_ETRS = device_property(
            doc="ETRS position (XYZ) of each antenna field",
            dtype="DevVarFloatArray",
            mandatory=False,
        )
    
        Antenna_Reference_ITRF = device_property(
            doc="ITRF position (XYZ) of each Antenna (leave empty to auto-derive from "
            "ETRS)",
            dtype="DevVarFloatArray",
            mandatory=False,
        )
    
        Antenna_Reference_ETRS = device_property(
            doc="ETRS position (XYZ) of each Antenna",
            dtype="DevVarFloatArray",
            mandatory=False,
        )
    
        ITRF_Reference_Frame = device_property(
            doc="Reference frame in which the ITRF coordinates are provided, or are to "
            "be computed from ETRS89",
            dtype="DevString",
            mandatory=False,
            default_value="ITRF2014",
        )
    
        ITRF_Reference_Epoch = device_property(
            doc="Reference epoch in which the ITRF coordinates are provided, or are to "
            "be extrapolated from ETRS89. If set to 0.0, the extrapolation from ETRS"
            "will be done rounded to the last half year (f.e. 2015.5 for november 2015)",
            dtype="DevFloat",
            mandatory=False,
            default_value=0.0,  # LOFAR1 used 2015.5,
        )
    
        PQR_to_ETRS_rotation_matrix = device_property(
            doc="Field-specific rotation matrix to convert PQR offsets to ETRS/ITRF "
            "offsets.",
            dtype="DevVarFloatArray",
            mandatory=False,
            default_value=numpy.array(
                [  # PQR->ETRS rotation matrix for the core stations
                    [-0.1195951054, -0.7919544517, 0.5987530018],
                    [0.9928227484, -0.0954186800, 0.0720990002],
                    [0.0000330969, 0.6030782884, 0.7976820024],
                ]
            ).flatten(),
        )
    
        # ----- SDP mapping
    
        Antenna_to_SDP_Mapping = device_property(
            dtype=(numpy.int32,),
            doc="The mapping of Antennas to FPGA input pairs. Each FPGA can handle 6 "
            "inputs, and SDP has 16 FPGAs. Each antenna is represented with a "
            "(fpga, input) value pair. The array is flattened, so must be reshaped "
            "upon use. An input=-1 means the antenna is unconnected.",
            default_value=numpy.array([-1] * MAX_ANTENNA * 2, dtype=numpy.int32),
        )
    
        # ----- RECV mapping
    
        RECV_Devices = device_property(
            dtype=(str,),
            doc="The set of RECVL or RECVH devices to which the Power and Control mappings refer.",
            mandatory=True,
        )
    
        # NB: For HBA, the line providing the X pol carries the power,
        #              the line providing the Y pol carries the control.
        #     For LBA, both lines carry the power,
        #              the line providing the Y pol carries the control.
    
        Control_to_RECV_mapping = device_property(
            dtype=(numpy.int32,),
            doc="The mapping of Antenna control lines to RECV mapping. Each RECV can "
            "handle 96 inputs. The Antenna number is the index and the value shows "
            "to which receiver device it is connected and on which input. The "
            "first integer is the input. The second interger is the RECV id. "
            "Example: [1, 3] = STAT/RECVH/1 with input 3. -1 means that the Antenna "
            "is not connected. The property is stored in a one dimensional "
            "structure. It needs to be reshaped to a list of lists of two items.",
            mandatory=False,
            default_value=[-1] * MAX_ANTENNA * 2,
        )
    
        # ----- Defaults
    
        Frequency_Band_RW_default = device_property(
            dtype=(str,),
            mandatory=False,
        )
    
        # ----- Generic information
    
        Antenna_Names_R = attribute(
            access=AttrWriteType.READ,
            dtype=(str,),
            max_dim_x=MAX_ANTENNA,
        )
    
        Antenna_to_SDP_Mapping_R = attribute(
            doc="To which (fpga, input) pair each antenna is connected. -1=unconnected.",
            dtype=((numpy.int32,),),
            max_dim_x=N_pol,
            max_dim_y=MAX_ANTENNA,
        )
    
        @attribute(
            access=AttrWriteType.READ,
            dtype=str,
            fisallowed="is_attribute_access_allowed",
        )
        def SDPFirmware_device_R(self):
            return self.control.child(DeviceTypes.SDPFirmware).dev_name()
    
        @attribute(
            access=AttrWriteType.READ,
            dtype="DevShort",
        )
        def Field_Attenuation_R(self):
            return self.Field_Attenuation
    
        @attribute(
            access=AttrWriteType.READ,
            dtype=((numpy.int32,),),
            max_dim_x=N_pol,
            max_dim_y=MAX_ANTENNA,
        )
        def Control_to_RECV_mapping_R(self):
            return numpy.array(self.Control_to_RECV_mapping).reshape(-1, 2)
    
        @attribute(
            access=AttrWriteType.READ,
            dtype=(str,),
            max_dim_x=16,
        )
        def RECV_Devices_R(self):
            return numpy.array(self.RECV_Devices)
    
        @attribute(
            access=AttrWriteType.READ,
            dtype=((numpy.float64,),),
            max_dim_x=3,
            max_dim_y=3,
        )
        def PQR_to_ETRS_rotation_matrix_R(self):
            return numpy.array(
                self.PQR_to_ETRS_rotation_matrix, dtype=numpy.float64
            ).reshape(3, 3)
    
        Frequency_Band_RW = attribute(
            doc="The selected frequency band of each polarisation of each antenna.",
            dtype=((str,),),
            max_dim_x=N_pol,
            max_dim_y=MAX_ANTENNA,
            access=AttrWriteType.READ_WRITE,
        )
    
        # ----- Cable information (between antenna and RCU)
    
        Antenna_Cables_R = attribute(
            doc=f"Which cables connect each antenna to the RCU. Both polarisations are "
            f"assumed to be connected using the same type of cable. Needs to be "
            f"any of ({', '.join(cable_types.keys())}).",
            dtype=(str,),
            max_dim_x=MAX_ANTENNA,
        )
        Antenna_Cables_Delay_R = attribute(
            doc="Delay caused by the cable between antenna and RCU, in seconds.",
            dtype=(numpy.float64,),
            max_dim_x=MAX_ANTENNA,
            unit="s",
        )
        Antenna_Cables_Loss_R = attribute(
            doc="Loss caused by the cable between antenna and RCU, in dB, for either polarisation.",
            dtype=((numpy.float64,),),
            max_dim_x=N_pol,
            max_dim_y=MAX_ANTENNA,
            unit="dB",
        )
    
        # ----- Status and usage information
    
        Antenna_Needs_Power_R = attribute(
            doc="Whether the antenna should be powered at all (False for f.e. signal generators).",
            dtype=(bool,),
            max_dim_x=MAX_ANTENNA,
        )
        Antenna_Status_R = attribute(
            doc="The status of each antenna.",
            dtype=(AntennaStatus,),
            max_dim_x=MAX_ANTENNA,
        )
        Antenna_Status_int_R = attribute(
            doc="The status of each antenna (as an integer). "
            "0=OK, 1=SUSPICIOUS, 2=BROKEN, 3=BEYOND_REPAIR, 4=NOT_AVAILABLE.",
            dtype=(numpy.uint32,),
            max_dim_x=MAX_ANTENNA,
        )
        Antenna_Use_R = attribute(
            doc="Whether each antenna should be used. 0=AUTO, 1=ON, 2=OFF. In AUTO "
            "mode, the antenna is used if it is not BROKEN or BEYOND_REPAIR.",
            dtype=(AntennaUse,),
            max_dim_x=MAX_ANTENNA,
        )
        Antenna_Usage_Mask_R = attribute(
            doc="Whether each antenna will be used.",
            dtype=(bool,),
            max_dim_x=MAX_ANTENNA,
        )
        Antenna_Sets_R = attribute(
            doc="Officially offered set of antennas",
            dtype=(str,),
            max_dim_x=N_ANTENNA_SETS,
        )
        Antenna_Set_Masks_R = attribute(
            doc="String encoding of the corresponding antenna masks",
            dtype=(str,),
            max_dim_x=N_ANTENNA_SETS,
        )
    
        # ----- Attributes mapped on RECV
    
        ANT_mask_RW = MappedAttribute(
            "ANT_mask_RW",
            dtype=(bool,),
            max_dim_x=MAX_ANTENNA,
            access=AttrWriteType.READ_WRITE,
        )
        RCU_PWR_ANT_on_R = MappedAttribute(
            "RCU_PWR_ANT_on_R", dtype=(bool,), max_dim_x=MAX_ANTENNA
        )
        RCU_PWR_ANT_on_RW = MappedAttribute(
            "RCU_PWR_ANT_on_RW",
            dtype=(bool,),
            max_dim_x=MAX_ANTENNA,
            access=AttrWriteType.READ_WRITE,
        )
        RCU_band_select_R = MappedAttribute(
            "RCU_band_select_R",
            dtype=((numpy.int64,),),
            max_dim_x=N_pol,
            max_dim_y=MAX_ANTENNA,
        )
        RCU_band_select_RW = MappedAttribute(
            "RCU_band_select_RW",
            dtype=((numpy.int64,),),
            max_dim_x=N_pol,
            max_dim_y=MAX_ANTENNA,
            access=AttrWriteType.READ_WRITE,
        )
        RCU_attenuator_dB_R = MappedAttribute(
            "RCU_attenuator_dB_R",
            dtype=((numpy.int64,),),
            max_dim_x=N_pol,
            max_dim_y=MAX_ANTENNA,
            abs_change="1",
        )
        RCU_attenuator_dB_RW = MappedAttribute(
            "RCU_attenuator_dB_RW",
            dtype=((numpy.int64,),),
            max_dim_x=N_pol,
            max_dim_y=MAX_ANTENNA,
            access=AttrWriteType.READ_WRITE,
        )
        RCU_DTH_freq_R = MappedAttribute(
            "RCU_DTH_freq_R",
            dtype=(numpy.int64,),
            max_dim_x=MAX_ANTENNA,
            abs_change="1",
        )
        RCU_DTH_freq_RW = MappedAttribute(
            "RCU_DTH_freq_RW",
            dtype=(numpy.int64,),
            max_dim_x=MAX_ANTENNA,
            access=AttrWriteType.READ_WRITE,
        )
        RCU_DTH_on_R = MappedAttribute(
            "RCU_DTH_on_R",
            dtype=(bool,),
            max_dim_x=MAX_ANTENNA,
        )
        RCU_DTH_PWR_R = MappedAttribute(
            "RCU_DTH_PWR_R",
            dtype=(numpy.float64,),
            max_dim_x=MAX_ANTENNA,
            doc="RCU Dither source power (dBm). Range -25 to -4.",
        )
        RCU_DTH_PWR_RW = MappedAttribute(
            "RCU_DTH_PWR_RW",
            dtype=(numpy.float64,),
            max_dim_x=MAX_ANTENNA,
            access=AttrWriteType.READ_WRITE,
            doc="RCU Dither source power (dBm). Range -25 to -4.",
        )
        RCU_DAB_filter_on_R = MappedAttribute(
            "RCU_DAB_filter_on_R",
            dtype=(bool,),
            max_dim_x=MAX_ANTENNA,
        )
        RCU_DAB_filter_on_RW = MappedAttribute(
            "RCU_DAB_filter_on_RW",
            dtype=(bool,),
            max_dim_x=MAX_ANTENNA,
            access=AttrWriteType.READ_WRITE,
        )
        RCU_PCB_ID_R = MappedAttribute(
            "RCU_PCB_ID_R",
            dtype=((numpy.int64,),),
            max_dim_x=2,
            max_dim_y=MAX_ANTENNA,
            abs_change="1",
        )
        RCU_PCB_version_R = MappedAttribute(
            "RCU_PCB_version_R",
            dtype=((str,),),
            max_dim_x=2,
            max_dim_y=MAX_ANTENNA,
        )
    
        # ----- Attributes mapped on SDP
    
        FPGA_sdp_info_observation_id_R = MappedAttribute(
            "FPGA_sdp_info_observation_id_R",
            dtype=(numpy.uint32,),
            max_dim_x=MAX_ANTENNA,
            mapping_device="SDP",
        )
        FPGA_sdp_info_observation_id_RW = MappedAttribute(
            "FPGA_sdp_info_observation_id_RW",
            dtype=(numpy.uint32,),
            max_dim_x=MAX_ANTENNA,
            access=AttrWriteType.READ_WRITE,
            mapping_device="SDP",
        )
        FPGA_sdp_info_antenna_band_index_R = MappedAttribute(
            "FPGA_sdp_info_antenna_band_index_R",
            dtype=(numpy.uint32,),
            max_dim_x=MAX_ANTENNA,
            mapping_device="SDP",
        )
        FPGA_sdp_info_antenna_band_index_RW = MappedAttribute(
            "FPGA_sdp_info_antenna_band_index_RW",
            dtype=(numpy.uint32,),
            max_dim_x=MAX_ANTENNA,
            access=AttrWriteType.READ_WRITE,
            mapping_device="SDP",
        )
    
        # ----- Position information
    
        Antenna_Field_Reference_ITRF_R = attribute(
            access=AttrWriteType.READ,
            doc="Absolute reference position of antenna field, in ITRF (XYZ)",
            dtype=(numpy.float64,),
            max_dim_x=N_xyz,
        )
    
        Antenna_Field_Reference_GEO_R = attribute(
            access=AttrWriteType.READ,
            doc="Absolute reference position of antenna field, "
            "in latitude/longitude (degrees)",
            dtype=(numpy.float64,),
            max_dim_x=N_latlong,
        )
    
        Antenna_Field_Reference_GEOHASH_R = attribute(
            access=AttrWriteType.READ,
            doc="Absolute reference position of antenna field, as a geohash string",
            dtype=str,
        )
    
        Antenna_Reference_ITRF_R = attribute(
            access=AttrWriteType.READ,
            doc="Absolute reference position of each tile, in ITRF (XYZ)",
            dtype=((numpy.float64,),),
            max_dim_x=N_xyz,
            max_dim_y=MAX_ANTENNA,
        )
    
        Antenna_Reference_GEO_R = attribute(
            access=AttrWriteType.READ,
            doc="Absolute reference position of each tile, in latitude/longitude (degrees)",
            dtype=((numpy.float64,),),
            max_dim_x=N_latlong,
            max_dim_y=MAX_ANTENNA,
        )
    
        Antenna_Reference_GEOHASH_R = attribute(
            access=AttrWriteType.READ,
            doc="Absolute reference position of each tile, as geohash strings",
            dtype=(str,),
            max_dim_x=MAX_ANTENNA,
        )
    
        antenna_type_R = attribute(
            doc="Type of Antennas in this field (f.e. LBA or HBA)", dtype=str
        )
        nr_antennas_R = attribute(doc="Number of Antennas in this field", dtype=numpy.int32)
    
        @attribute(
            doc="Epoch used to extrapolate the ITRF model to.",
            dtype=numpy.float64,
            unit="years",
        )
        def ITRF_Reference_Epoch_R(self):
            if self.ITRF_Reference_Epoch > 2000.0:
                return self.ITRF_Reference_Epoch
    
            # return current year as a float, rounded
            # down to half a year.
            now = datetime.datetime.now()
            if now.month <= 6:
                return numpy.float64(now.year + 0.0)
            else:
                return numpy.float64(now.year + 0.5)
    
        def __init__(self, cl, name):
            self.sdpfirmware_proxy = None
            self.sdp_proxy = None
            self.recv_proxies = []
    
            # Super must be called after variable assignment due to executing init_device!
            super().__init__(cl, name)
    
        @property
        def nr_antennas(self):
            # The number of antennas should be equal to:
            # * the number of elements in the Control_to_RECV_mapping (after reshaping),
            # * the number of elements in the Power_to_RECV_mapping (after reshaping),
            # * the number of antennas exposed through Antenna_Reference_ITRF_R.
            # * the number of elements in Antenna_Use
            # * the number of elements in Antenna_Status
            # * the number of elements in Antenna_to_SDP_Mapping
            #
            # Parsing a property here is quickest, so we chose that.
            return len(self.Control_to_RECV_mapping) // 2
    
        def read_Antenna_Names_R(self):
            antenna_names = numpy.array(self.Antenna_Names)[: self.nr_antennas]
            return antenna_names
    
        @command(dtype_in=str, dtype_out=DevVarBooleanArray)
        def antenna_set_to_mask(self, antenna_set):
            """Translate the antenna set string code into the corresponding
            antenna mask array"""
            return antenna_set_to_mask(
                antenna_set,
                self.nr_antennas,
                self.Antenna_Sets,
                self.Antenna_Set_Masks,
            )
    
        def read_Frequency_Band_RW(self):
            antenna_type = self.ANTENNA_TYPE
    
            # fetch settings from RECV, use X polarisation for reference
            rcu_band_select = self.read_attribute("RCU_band_select_RW")
    
            # fetch settings from SDP
            clock = self.sdpfirmware_proxy.clock_RW
    
            def lookup_band(rcu_band):
                try:
                    return Band.lookup_rcu_band(antenna_type, clock, rcu_band)
                except ValueError:
                    # unknown configuration
                    return f"<unsupported: {antenna_type=} {clock=} {rcu_band=}>"
    
            # compute frequency band for each antenna
            return numpy.vectorize(lookup_band)(rcu_band_select)
    
        def write_Frequency_Band_RW(self, value):
            # use numpy for easy processing
            value = numpy.array(value)
    
            # validate content
            for val in value.flatten():
                if val not in bands:
                    raise ValueError(
                        f"Unsupported frequency band: {val}. Must be one of {list(bands)}."
                    )
    
                if bands[val].antenna_type != self.ANTENNA_TYPE:
                    raise ValueError(
                        f"Unsupported frequency band for our antenna type: {val} \
                            is for {bands[val].antenna_type}, but we are {self.ANTENNA_TYPE}."
                    )
    
                if (
                    bands[val].clock != bands[value[0, 0]].clock
                ):  # NB: "value[0,0] in bands" holds at this point
                    raise ValueError(
                        f"All frequency bands must use the same clock. \
                            These do not: {val} and {value[0, 0]}."
                    )
    
            # convert into settings for RECV
            self.proxy.RCU_band_select_RW = numpy.vectorize(
                lambda band: bands[band].rcu_band
            )(value)
    
            # apply settings on SDP
            self.sdpfirmware_proxy.clock_RW = bands[value[0, 0]].clock
    
            # read-modify-write on [fpga][(input, polarisation)]
            sdp_nyquist_zone = numpy.full((N_pn, A_pn * N_pol), None)
    
            for antenna_nr, (fpga_nr, input_nr) in enumerate(
                self.read_attribute("Antenna_to_SDP_Mapping_R")
            ):
                if input_nr == -1:
                    continue
    
                # set for x polarisation
                sdp_nyquist_zone[fpga_nr, input_nr * 2 + 0] = bands[
                    value[antenna_nr, 0]
                ].nyquist_zone
                # set for y polarisation
                sdp_nyquist_zone[fpga_nr, input_nr * 2 + 1] = bands[
                    value[antenna_nr, 1]
                ].nyquist_zone
    
            self.atomic_read_modify_write_attribute(
                sdp_nyquist_zone, self.sdp_proxy, "nyquist_zone_RW", None, numpy.uint32
            )
    
        def read_Antenna_Cables_R(self):
            antenna_cables = numpy.array(self.Antenna_Cables)[: self.nr_antennas]
            return antenna_cables
    
        def read_Antenna_Cables_Delay_R(self):
            return numpy.array(
                [cable_types[antenna].delay for antenna in self.read_Antenna_Cables_R()]
            )
    
        def read_Antenna_Cables_Loss_R(self):
            rcu_bands = self.read_attribute("RCU_band_select_RW")
    
            control_to_recv_mapping = numpy.array(self.Control_to_RECV_mapping).reshape(
                -1, 2
            )
            recvs = control_to_recv_mapping[:, 0]  # first column is RECV device number
    
            # Unconnected antennas return RCU band 0, which does not exist.
            # Return 0 loss for them instead.
            return numpy.array(
                [
                    (
                        [
                            cable_types[cable].get_loss(self.ANTENNA_TYPE, rcu_band[0]),
                            cable_types[cable].get_loss(self.ANTENNA_TYPE, rcu_band[1]),
                        ]
                        if recv > 0
                        else [0, 0]
                    )
                    for recv, cable, rcu_band in zip(recvs, self.Antenna_Cables, rcu_bands)
                ]
            )
    
        def read_Antenna_Needs_Power_R(self):
            antenna_needs_power = numpy.array(self.Antenna_Needs_Power)[: self.nr_antennas]
            return antenna_needs_power
    
        def read_Antenna_Use_R(self):
            antenna_use = numpy.array(self.Antenna_Use)[: self.nr_antennas]
            return numpy.array([AntennaUse(x) for x in antenna_use], dtype=AntennaUse)
    
        def read_Antenna_Status_R(self):
            antenna_status = numpy.array(self.Antenna_Status)[: self.nr_antennas]
            return numpy.array(
                [AntennaStatus(x) for x in antenna_status], dtype=AntennaStatus
            )
    
        def read_Antenna_Status_int_R(self):
            return numpy.array(
                [x.value for x in self.read_Antenna_Status_R()], dtype=numpy.uint32
            )
    
        def read_Antenna_Usage_Mask_R(self):
            use = self.read_Antenna_Use_R()
            status = self.read_Antenna_Status_R()
    
            antennas_forced_on = use == AntennaUse.ON
            antennas_auto_on = numpy.logical_and(
                use == AntennaUse.AUTO, status <= AntennaStatus.SUSPICIOUS
            )
    
            return numpy.logical_or(antennas_forced_on, antennas_auto_on)
    
        def read_Antenna_Sets_R(self):
            return self.Antenna_Sets
    
        def read_Antenna_Set_Masks_R(self):
            return self.Antenna_Set_Masks
    
        def read_Antenna_to_SDP_Mapping_R(self):
            return numpy.array(self.Antenna_to_SDP_Mapping).reshape(-1, 2)
    
        def read_nr_antennas_R(self):
            return self.nr_antennas
    
        def read_antenna_type_R(self):
            return self.ANTENNA_TYPE
    
        def read_Antenna_Field_Reference_ITRF_R(self):
            # provide ITRF field coordinates if they were configured
            if self.Antenna_Field_Reference_ITRF:
                return numpy.array(self.Antenna_Field_Reference_ITRF).reshape(N_xyz)
    
            # calculate them from ETRS coordinates if not, using the configured ITRF
            # reference
            etrs_coordinates = numpy.array(self.Antenna_Field_Reference_ETRS).reshape(N_xyz)
            epoch = self.read_attribute("ITRF_Reference_Epoch_R")
            return ETRS_to_ITRF(etrs_coordinates, self.ITRF_Reference_Frame, epoch)
    
        def read_Antenna_Field_Reference_GEO_R(self):
            return ITRF_to_GEO(self.read_Antenna_Field_Reference_ITRF_R())
    
        def read_Antenna_Field_Reference_GEOHASH_R(self):
            return GEO_to_GEOHASH(self.read_Antenna_Field_Reference_GEO_R())
    
        def read_Antenna_Reference_ITRF_R(self):
            # provide ITRF coordinates if they were configured
            if self.Antenna_Reference_ITRF:
                return numpy.array(self.Antenna_Reference_ITRF).reshape(
                    self.nr_antennas, N_xyz
                )
    
            # calculate them from ETRS coordinates if not, using the configured ITRF
            # reference
            etrs_coordinates = numpy.array(self.Antenna_Reference_ETRS).reshape(
                self.nr_antennas, N_xyz
            )
            epoch = self.read_attribute("ITRF_Reference_Epoch_R")
    
            return ETRS_to_ITRF(etrs_coordinates, self.ITRF_Reference_Frame, epoch)
    
        def read_Antenna_Reference_GEO_R(self):
            return ITRF_to_GEO(self.read_Antenna_Reference_ITRF_R())
    
        def read_Antenna_Reference_GEOHASH_R(self):
            return GEO_to_GEOHASH(self.read_Antenna_Reference_GEO_R())
    
        def _setup_all_proxies(self):
            self.recv_proxies.clear()
    
            for recv in self.RECV_Devices:
                self.recv_proxies.append(create_device_proxy(recv, write_access=True))
    
            self.sdpfirmware_proxy = self.control.child(DeviceTypes.SDPFirmware)
            self.sdp_proxy = self.control.child(self.sdpfirmware_proxy.SDP_device_R)
    
        def _setup_recv_mapper(self):
            # Reshape of mapping is needed because properties are stored in 1d arrays
            recv_mapping = {
                MappingKeys.CONTROL: numpy.reshape(self.Control_to_RECV_mapping, (-1, 2)),
                MappingKeys.POWER: numpy.reshape(self.Power_to_RECV_mapping, (-1, 2)),
            }
    
            self._recv_mapper = self._get_recv_mapper(recv_mapping, len(self.recv_proxies))
    
        def _get_recv_mapper(self, recv_mapping, nr_recv_devices):
            """Provide the RECV mapper based on the antenna type"""
            raise NotImplementedError
    
        def _setup_sdp_mapper(self):
            # Reshape of mapping is needed because properties are stored in 1d arrays
            fpga_sdp_mapping = numpy.reshape(self.Antenna_to_SDP_Mapping, (-1, 2))
            self.__sdp_mapper = AntennaToSdpMapper(
                {MappingKeys.FPGA: fpga_sdp_mapping},
                self.get_mapped_dimensions("SDP"),
            )
    
        def get_mapped_attribute(self, mapped_point: str, mapped_device: str):
            """Read method implementation of the MappedAttribute class"""
            # SDP
            if mapped_device == "SDP":
                sdp_results = self.sdp_proxy.read_attribute(mapped_point).value
                mapped_values = self.__sdp_mapper.map_read(mapped_point, sdp_results)
                return mapped_values
            # RECV
            recv_results = []
            for recv_proxy in self.recv_proxies:
                result = recv_proxy.read_attribute(mapped_point).value
                recv_results.append(result)
    
            mapped_values = self._recv_mapper.map_read(mapped_point, recv_results)
    
            return mapped_values
    
        def set_mapped_attribute(
            self, mapped_point: str, value, cast_type: type, mapped_device: str
        ):
            """Set the attribute to new value only for controlled points
    
            :warning: This method is susceptible to a lost update race condition if the
                      attribute on the RECVH/RECVL device is written to in between `read_attribute`
                      and `write_attribute`!
    
            """
            if mapped_device == "SDP":
                # returns sparse multidimensional array, uncontrolled values set to None
                mapped_value = self.__sdp_mapper.map_write(mapped_point, value)
    
                self.atomic_read_modify_write_attribute(
                    mapped_value, self.sdp_proxy, mapped_point, cast_type=cast_type
                )
            else:
                # returns sparse multidimensional array, uncontrolled values set to None
                mapped_value = self._recv_mapper.map_write(mapped_point, value)
    
                for idx, recv_proxy in enumerate(self.recv_proxies):
                    new_values = mapped_value[idx]
    
                    self.atomic_read_modify_write_attribute(
                        new_values, recv_proxy, mapped_point, cast_type=cast_type
                    )
    
        # --------
        # Overloaded functions
        # --------
    
        @log_exceptions()
        def configure_for_initialise(self):
            super().configure_for_initialise()
    
            self._setup_all_proxies()
            self._setup_recv_mapper()
            self._setup_sdp_mapper()
    
        def _read_hardware_powered_fraction_R(self):
            """Read attribute which monitors the power"""
    
            mask = self.read_attribute("ANT_mask_RW")
            powered = self.read_attribute("RCU_PWR_ANT_on_R")
    
            try:
                return numpy.count_nonzero(powered & mask) / numpy.count_nonzero(mask)
            except ZeroDivisionError:
                return 1.0
    
        def _power_hardware_on(self):
            # Configure the devices that process our antennas
            self.configure_recv()
            self.configure_sdp()
    
        def power_antennas_off(self, mask: numpy.ndarray):
            # Power the specified antennas off, do not touch other antennas.
            #
            # Mask: a boolean array indicating which antennas should be powered OFF
    
            # Save actual mask values
            ANT_mask_RW = self.read_attribute("ANT_mask_RW")
    
            # Enable controlling all requested antennas
            self.proxy.write_attribute("ANT_mask_RW", mask)
    
            try:
                # make sure we update RCU_PWR_ANT_on_RW neatly with our mask
                RCU_PWR_ANT_on_RW = numpy.logical_and(
                    self.read_attribute("RCU_PWR_ANT_on_RW"), numpy.logical_not(mask)
                )
    
                # Turn off power to all antennas (in the requested mask)
                self.proxy.write_attribute("RCU_PWR_ANT_on_RW", RCU_PWR_ANT_on_RW)
            finally:
                # Restore original mask
                self.proxy.write_attribute("ANT_mask_RW", ANT_mask_RW)
    
        def _power_hardware_off(self):
            # Power all antennas off
            self.power_antennas_off([True] * self.nr_antennas)
    
        # --------
        # Commands
        # --------
    
        @command()
        @only_in_states(DEFAULT_COMMAND_STATES)
        @DebugIt()
        @log_exceptions()
        def configure_recv(self):
            """Configure RECV to process our antennas."""
    
            # Power off what should be off, f.e. if they got turned off in the mask
            # but are still powered.
            self.power_antennas_off(
                numpy.logical_not(self.read_attribute("Antenna_Usage_Mask_R"))
            )
    
            # Disable controlling the antennas that fall outside the mask
            self.proxy.write_attribute(
                "ANT_mask_RW", self.read_attribute("Antenna_Usage_Mask_R")
            )
    
            # Turn on power to antennas that we use, and that need it
            self.proxy.write_attribute(
                "RCU_PWR_ANT_on_RW",
                numpy.logical_and(
                    self.read_attribute("Antenna_Needs_Power_R"),
                    self.read_attribute("Antenna_Usage_Mask_R"),
                ),
            )
    
        @command()
        @only_in_states(DEFAULT_COMMAND_STATES)
        @DebugIt()
        @log_exceptions()
        def configure_sdp(self):
            """Configure SDP to process our antennas."""
    
        @command()
        @DebugIt()
        @only_in_states(DEFAULT_COMMAND_STATES)
        def RCU_DTH_setup(self):
            walker = RecvDeviceWalker(
                self.read_attribute("Control_to_RECV_mapping_R"),
                self.read_attribute("Antenna_Usage_Mask_R"),
            )
            walker.walk_receivers(self.recv_proxies, lambda recv: recv.RCU_DTH_setup())
    
        @command()
        @DebugIt()
        @only_in_states(DEFAULT_COMMAND_STATES)
        def RCU_DTH_on(self):
            walker = RecvDeviceWalker(
                self.read_attribute("Control_to_RECV_mapping_R"),
                self.read_attribute("Antenna_Usage_Mask_R"),
            )
            walker.walk_receivers(self.recv_proxies, lambda recv: recv.RCU_DTH_on())
    
        @command()
        @DebugIt()
        @only_in_states(DEFAULT_COMMAND_STATES)
        def RCU_DTH_off(self):
            walker = RecvDeviceWalker(
                self.read_attribute("Control_to_RECV_mapping_R"),
                self.read_attribute("Antenna_Usage_Mask_R"),
            )
            walker.walk_receivers(self.recv_proxies, lambda recv: recv.RCU_DTH_off())