From 2c72587789c0c8676746042b160ab4b65abf367f Mon Sep 17 00:00:00 2001 From: Stefano Di Frischia <stefano.difrischia@inaf.it> Date: Fri, 28 Jul 2023 14:12:12 +0000 Subject: [PATCH] Resolve L2SS-1218 "Refactor antennamapper" --- README.md | 2 + tangostationcontrol/VERSION | 2 +- .../devices/antennafield.py | 470 ++----------- .../devices/base_device_classes/mapper.py | 412 +++++++++++ .../test/devices/test_antennafield_device.py | 647 ++++++++++-------- 5 files changed, 823 insertions(+), 710 deletions(-) create mode 100644 tangostationcontrol/tangostationcontrol/devices/base_device_classes/mapper.py diff --git a/README.md b/README.md index 0423d581a..eab36569f 100644 --- a/README.md +++ b/README.md @@ -115,6 +115,8 @@ Next change the version in the following places: # Release Notes +* 0.20.1 Create an abstract AntennaMapper class which implements behavior of both AntennaToSdpMapper + and AntennaToRecvMapper * 0.20.0 Complete implementation of station-state transitions in StationManager device. Unified power management under power_hardware_on/off(), dropping prepare_hardware(), disable_hardware(). diff --git a/tangostationcontrol/VERSION b/tangostationcontrol/VERSION index 5a03fb737..847e9aef6 100644 --- a/tangostationcontrol/VERSION +++ b/tangostationcontrol/VERSION @@ -1 +1 @@ -0.20.0 +0.20.1 diff --git a/tangostationcontrol/tangostationcontrol/devices/antennafield.py b/tangostationcontrol/tangostationcontrol/devices/antennafield.py index fb1ff7dfc..58b6dea33 100644 --- a/tangostationcontrol/tangostationcontrol/devices/antennafield.py +++ b/tangostationcontrol/tangostationcontrol/devices/antennafield.py @@ -7,7 +7,7 @@ import logging from enum import IntEnum from math import pi -from typing import List +from typing import List, Dict import numpy @@ -33,8 +33,6 @@ from tangostationcontrol.common.constants import ( N_pol, N_xyz, N_latlong, - N_rcu, - N_rcu_inp, N_pn, A_pn, N_ANTENNA_SETS, @@ -47,15 +45,20 @@ from tangostationcontrol.common.lofar_logging import ( ) from tangostationcontrol.common.proxy import create_device_proxy from tangostationcontrol.common.states import DEFAULT_COMMAND_STATES -from tangostationcontrol.common.type_checking import type_not_sequence from tangostationcontrol.common.antennas import antenna_set_to_mask from tangostationcontrol.devices.device_decorators import only_in_states from tangostationcontrol.devices.base_device_classes.lofar_device import LOFARDevice from tangostationcontrol.devices.types import DeviceTypes +from tangostationcontrol.devices.base_device_classes.mapper import ( + MappingKeys, + MappedAttribute, + AntennaToRecvMapper, + AntennaToSdpMapper, +) logger = logging.getLogger() -__all__ = ["AntennaField", "AntennaToRecvMapper", "AntennaToSdpMapper", "main"] +__all__ = ["AntennaField", "main"] class AntennaUse(IntEnum): @@ -71,44 +74,6 @@ class AntennaQuality(IntEnum): BEYOND_REPAIR = 3 -class MappedAttribute(attribute): - def __init__( - self, - mapping_attribute, - dtype, - max_dim_x, - max_dim_y=0, - access=AttrWriteType.READ, - mapping_device="RECV", - **kwargs, - ): - if access == AttrWriteType.READ_WRITE: - - def write_func_wrapper(device, value): - cast_type = dtype - while not type_not_sequence(cast_type): - cast_type = cast_type[0] - write_func = device.set_mapped_attribute( - mapping_attribute, value, cast_type, mapping_device - ) - - self.fset = write_func_wrapper - - def read_func_wrapper(device): - return device.get_mapped_attribute(mapping_attribute, mapping_device) - - self.fget = read_func_wrapper - - super().__init__( - dtype=dtype, - max_dim_y=max_dim_y, - max_dim_x=max_dim_x, - access=access, - fisallowed="is_attribute_access_allowed", - **kwargs, - ) - - @device_logging_to_python() class AntennaField(LOFARDevice): """Manages the antennas in a single antenna field, by acting as a @@ -133,6 +98,32 @@ class AntennaField(LOFARDevice): calculated, as well as the geohash. """ + @classmethod + def get_mapped_attributes(cls, device_name) -> List[str]: + """Return a list of the mapped attributes""" + # collect all attributes for which defaults are provided + result = [] + mapped_attrs = [ + name for name in dir(cls) if isinstance(getattr(cls, name), MappedAttribute) + ] + for attr in mapped_attrs: + attr_instance = getattr(AntennaField, attr) + # check the variable 'mapping_device' + if attr_instance.mapping_device == device_name: + result.append(attr) + return result + + @classmethod + def get_mapped_dimensions(cls, device_name) -> Dict[str, tuple]: + """Return a dictionary of the mapped attribute dimensions""" + mapped_dims = {} + for attr in cls.get_mapped_attributes(device_name): + attr_instance = getattr(cls, attr) + dim_x, dim_y = attr_instance.dim_x, attr_instance.dim_y + # Insert the dimension(s) in the map + mapped_dims[attr] = (dim_x,) if dim_y == 0 else (dim_y, dim_x) + return mapped_dims + # ----- Antenna names Antenna_Names = device_property( @@ -933,14 +924,23 @@ class AntennaField(LOFARDevice): # Reshape of mapping is needed because properties are stored in 1d arrays control_mapping = numpy.reshape(self.Control_to_RECV_mapping, (-1, 2)) power_mapping = numpy.reshape(self.Power_to_RECV_mapping, (-1, 2)) + recv_mapping = { + MappingKeys.CONTROL: control_mapping, + MappingKeys.POWER: power_mapping, + } self.__recv_mapper = AntennaToRecvMapper( - control_mapping, power_mapping, number_of_receivers + recv_mapping, + self.get_mapped_dimensions("RECV"), + number_of_receivers, ) def __setup_sdp_mapper(self): # Reshape of mapping is needed because properties are stored in 1d arrays fpga_sdp_mapping = numpy.reshape(self.Antenna_to_SDP_Mapping, (-1, 2)) - self.__sdp_mapper = AntennaToSdpMapper(fpga_sdp_mapping) + self.__sdp_mapper = AntennaToSdpMapper( + {MappingKeys.FPGA: fpga_sdp_mapping}, + self.get_mapped_dimensions("SDP"), + ) def get_mapped_attribute(self, mapped_point: str, mapped_device: str): """Read method implementation of the MappedAttribute class""" @@ -1090,386 +1090,6 @@ class AntennaField(LOFARDevice): return result_values.flatten() -class AntennaToSdpMapper(object): - _VALUE_MAP_NONE_16 = numpy.full(N_pn, None) - - def __init__(self, fpga_sdp_mapping) -> None: - number_of_antennas = len(fpga_sdp_mapping) - - value_map_fpga_info_int = numpy.zeros(number_of_antennas, dtype=numpy.uint32) - - self._fpga_mapping = fpga_sdp_mapping - self._value_mapper = { - "FPGA_sdp_info_observation_id_R": self._fpga_mapping, - "FPGA_sdp_info_observation_id_RW": self._fpga_mapping, - "FPGA_sdp_info_antenna_band_index_R": self._fpga_mapping, - "FPGA_sdp_info_antenna_band_index_RW": self._fpga_mapping, - } - self._default_value_mapping_read = { - "FPGA_sdp_info_observation_id_R": value_map_fpga_info_int, - "FPGA_sdp_info_observation_id_RW": value_map_fpga_info_int, - "FPGA_sdp_info_antenna_band_index_R": value_map_fpga_info_int, - "FPGA_sdp_info_antenna_band_index_RW": value_map_fpga_info_int, - } - self._masked_value_mapping_write = { - "FPGA_sdp_info_observation_id_RW": AntennaToSdpMapper._VALUE_MAP_NONE_16, - "FPGA_sdp_info_antenna_band_index_RW": AntennaToSdpMapper._VALUE_MAP_NONE_16, - } - self._reshape_attributes_in = { - "FPGA_sdp_info_observation_id_R": (MAX_ANTENNA,), - "FPGA_sdp_info_observation_id_RW": (MAX_ANTENNA,), - "FPGA_sdp_info_antenna_band_index_R": (MAX_ANTENNA,), - "FPGA_sdp_info_antenna_band_index_RW": (MAX_ANTENNA,), - } - self._reshape_attributes_out = { - "FPGA_sdp_info_observation_id_R": (N_pn,), - "FPGA_sdp_info_observation_id_RW": (N_pn,), - "FPGA_sdp_info_antenna_band_index_R": (N_pn,), - "FPGA_sdp_info_antenna_band_index_RW": (N_pn,), - } - - def map_read(self, mapped_attribute: str, sdp_results: List[any]) -> List[any]: - """Perform a mapped read for the attribute using the recv_results - - :param mapped_attribute: attribute identifier as present in - py:attribute:`~_default_value_mapping_read` - :param sdp_results: Results as read from SDP device - :return: sdp_results as mapped given attribute dimensions and fpga mapping - """ - - default_values = self._default_value_mapping_read[mapped_attribute] - - return self._mapped_r_values( - sdp_results, default_values, self._value_mapper[mapped_attribute] - ) - - def map_write(self, mapped_attribute: str, set_values: List[any]) -> List[any]: - """Perform a mapped write for the attribute using the set_values - - :param mapped_attribute: attribute identifier as present in - py:attribute:`~_default_value_mapping_write` - :param set_values: The values to be set for the specified attribute - :return: set_values as mapped given attribute dimensions and control mapping - """ - - default_values = self._masked_value_mapping_write[mapped_attribute] - - mapped_values = self._mapped_rw_values( - set_values, default_values, self._value_mapper[mapped_attribute] - ) - - return mapped_values - - def _mapped_r_values( - self, sdp_results: List[any], default_values: List[any], value_mapping - ): - """Mapping for read using :py:attribute:`~_fpga_mapping` and shallow copy""" - - mapped_values = numpy.array(default_values) - for idx, mapping in enumerate(value_mapping): - fpga = mapping[0] - _input = mapping[1] - if _input > -1: - mapped_values[idx] = sdp_results[fpga] - - return mapped_values - - def _mapped_rw_values( - self, set_values: List[any], default_values: List[any], value_mapping - ): - """Mapping for write using :py:attribute:`~_fpga_mapping` and shallow copy""" - - mapped_values = numpy.array(default_values) - - for idx, mapping in enumerate(value_mapping): - fpga = mapping[0] - _input = mapping[1] - if _input > -1: - mapped_values[fpga] = set_values[idx] - - return mapped_values - - -class AntennaToRecvMapper(object): - _VALUE_MAP_NONE_96 = numpy.full(MAX_ANTENNA, None) - _VALUE_MAP_NONE_96_32 = numpy.full((MAX_ANTENNA, N_elements * N_pol), None) - _VALUE_MAP_NONE_96_2 = numpy.full((MAX_ANTENNA, N_pol), None) - - def __init__( - self, control_to_recv_mapping, power_to_recv_mapping, number_of_receivers - ): - number_of_antennas = len(control_to_recv_mapping) - - # Reduce memory footprint of default values by creating single instance of - # common fields - value_map_ant_32_int = numpy.zeros( - [number_of_antennas, N_elements * N_pol], dtype=numpy.int64 - ) - value_map_ant_32_bool = numpy.full( - (number_of_antennas, N_elements * N_pol), False - ) - value_map_ant_bool = numpy.full(number_of_antennas, False) - - self._control_mapping = control_to_recv_mapping - self._power_mapping = power_to_recv_mapping - self._number_of_receivers = number_of_receivers - self._value_mapper = { - "ANT_mask_RW": [self._control_mapping], - "HBAT_BF_delay_steps_R": [self._control_mapping], - "HBAT_BF_delay_steps_RW": [self._control_mapping], - "HBAT_LED_on_R": [self._control_mapping], - "HBAT_LED_on_RW": [self._control_mapping], - "HBAT_PWR_LNA_on_R": [self._control_mapping], - "HBAT_PWR_LNA_on_RW": [self._control_mapping], - "HBAT_PWR_on_R": [self._control_mapping], - "HBAT_PWR_on_RW": [self._control_mapping], - "RCU_PWR_ANT_on_R": [self._power_mapping], - "RCU_PWR_ANT_on_RW": [self._power_mapping], - "RCU_band_select_R": [self._control_mapping], - "RCU_band_select_RW": [self._control_mapping], - "RCU_attenuator_dB_R": [self._power_mapping, self._control_mapping], - "RCU_attenuator_dB_RW": [self._power_mapping, self._control_mapping], - "RCU_DTH_freq_R": [self._control_mapping], - "RCU_DTH_freq_RW": [self._control_mapping], - "RCU_DTH_on_R": [self._control_mapping], - "RCU_PCB_ID_R": [self._power_mapping, self._control_mapping], - "RCU_PCB_version_R": [self._power_mapping, self._control_mapping], - "RCU_DTH_PWR_R": [self._control_mapping], - "RCU_DTH_PWR_RW": [self._control_mapping], - "RCU_DAB_filter_on_R": [self._control_mapping], - "RCU_DAB_filter_on_RW": [self._control_mapping], - } - self._default_value_mapping_read = { - "ANT_mask_RW": value_map_ant_bool, - "HBAT_BF_delay_steps_R": value_map_ant_32_int, - "HBAT_BF_delay_steps_RW": value_map_ant_32_int, - "HBAT_LED_on_R": value_map_ant_32_bool, - "HBAT_LED_on_RW": value_map_ant_32_bool, - "HBAT_PWR_LNA_on_R": value_map_ant_32_bool, - "HBAT_PWR_LNA_on_RW": value_map_ant_32_bool, - "HBAT_PWR_on_R": value_map_ant_32_bool, - "HBAT_PWR_on_RW": value_map_ant_32_bool, - "RCU_PWR_ANT_on_R": value_map_ant_bool, - "RCU_PWR_ANT_on_RW": value_map_ant_bool, - "RCU_band_select_R": numpy.zeros(number_of_antennas, dtype=numpy.int64), - "RCU_band_select_RW": numpy.zeros(number_of_antennas, dtype=numpy.int64), - "RCU_attenuator_dB_R": numpy.zeros( - (number_of_antennas, N_pol), dtype=numpy.int64 - ), - "RCU_attenuator_dB_RW": numpy.zeros( - (number_of_antennas, N_pol), dtype=numpy.int64 - ), - "RCU_DTH_freq_R": numpy.zeros(number_of_antennas, dtype=numpy.float64), - "RCU_DTH_freq_RW": numpy.zeros(number_of_antennas, dtype=numpy.float64), - "RCU_DTH_on_R": value_map_ant_bool, - "RCU_DTH_PWR_R": numpy.zeros(number_of_antennas, dtype=numpy.float64), - "RCU_DTH_PWR_RW": numpy.zeros(number_of_antennas, dtype=numpy.float64), - "RCU_DAB_filter_on_R": value_map_ant_bool, - "RCU_DAB_filter_on_RW": value_map_ant_bool, - "RCU_PCB_ID_R": numpy.zeros((number_of_antennas, 2), dtype=numpy.int64), - "RCU_PCB_version_R": numpy.full((number_of_antennas, 2), "", dtype=str), - } - self._masked_value_mapping_write = { - "ANT_mask_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96, - "HBAT_BF_delay_steps_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96_32, - "HBAT_LED_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96_32, - "HBAT_PWR_LNA_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96_32, - "HBAT_PWR_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96_32, - "RCU_PWR_ANT_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96, - "RCU_band_select_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96, - "RCU_attenuator_dB_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96_2, - "RCU_DTH_freq_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96, - "RCU_DTH_PWR_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96, - "RCU_DAB_filter_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96, - "RCU_PCB_ID_R": AntennaToRecvMapper._VALUE_MAP_NONE_96_2, - "RCU_PCB_version_R": AntennaToRecvMapper._VALUE_MAP_NONE_96_2, - } - self._reshape_attributes_in = { - "ANT_mask_RW": (MAX_ANTENNA,), - "HBAT_BF_delay_steps_RW": (MAX_ANTENNA, N_elements * N_pol), - "RCU_PWR_ANT_on_R": (MAX_ANTENNA,), - "RCU_PWR_ANT_on_RW": (MAX_ANTENNA,), - "RCU_band_select_R": (MAX_ANTENNA,), - "RCU_band_select_RW": (MAX_ANTENNA,), - "RCU_attenuator_dB_R": ((MAX_ANTENNA * N_pol),), - "RCU_attenuator_dB_RW": ((MAX_ANTENNA * N_pol),), - "RCU_DTH_freq_R": (MAX_ANTENNA,), - "RCU_DTH_freq_RW": (MAX_ANTENNA,), - "RCU_DTH_on_R": (MAX_ANTENNA,), - "RCU_DTH_PWR_R": (MAX_ANTENNA,), - "RCU_DTH_PWR_RW": (MAX_ANTENNA,), - "RCU_DAB_filter_on_R": (MAX_ANTENNA,), - "RCU_DAB_filter_on_RW": (MAX_ANTENNA,), - "RCU_PCB_ID_R": (MAX_ANTENNA,), - "RCU_PCB_version_R": (MAX_ANTENNA,), - } - self._reshape_attributes_out = { - "ANT_mask_RW": (N_rcu, N_rcu_inp), - "HBAT_BF_delay_steps_RW": (MAX_ANTENNA, N_elements * N_pol), - "RCU_PWR_ANT_on_R": (N_rcu, N_rcu_inp), - "RCU_PWR_ANT_on_RW": (N_rcu, N_rcu_inp), - "RCU_band_select_R": (N_rcu, N_rcu_inp), - "RCU_band_select_RW": (N_rcu, N_rcu_inp), - "RCU_attenuator_dB_R": (N_rcu, N_rcu_inp), - "RCU_attenuator_dB_RW": (N_rcu, N_rcu_inp), - "RCU_DTH_freq_R": (N_rcu, N_rcu_inp), - "RCU_DTH_freq_RW": (N_rcu, N_rcu_inp), - "RCU_DTH_on_R": (N_rcu, N_rcu_inp), - "RCU_DTH_PWR_R": (N_rcu, N_rcu_inp), - "RCU_DTH_PWR_RW": (N_rcu, N_rcu_inp), - "RCU_DAB_filter_on_R": (N_rcu, N_rcu_inp), - "RCU_DAB_filter_on_RW": (N_rcu, N_rcu_inp), - "RCU_PCB_ID_R": (N_rcu,), - "RCU_PCB_version_R": (N_rcu,), - } - - # Attributes which need to be reshaped with a copy of their values, - # because RECV original dimension < AntennaField mapped dimension - self._fill_attributes_in = { - "RCU_attenuator_dB_R": N_pol, - "RCU_attenuator_dB_RW": N_pol, - "RCU_PCB_ID_R": N_rcu_inp, - "RCU_PCB_version_R": N_rcu_inp, - } - - # Attributes which need to be reshaped with removing a part of their duplicated - # values because RECV original dimension < Antennafield mapped dimension - self._empty_attributes_out = { - "RCU_attenuator_dB_RW": (N_rcu * N_rcu_inp, N_pol) - } - - def map_read(self, mapped_attribute: str, recv_results: List[any]) -> List[any]: - """Perform a mapped read for the attribute using the recv_results - - :param mapped_attribute: attribute identifier as present in - py:attribute:`~_default_value_mapping_read` - :param recv_results: Results as gathered by appending attributes from RECV - devices - :return: recv_results as mapped given attribute dimensions and control mapping - """ - - default_values = self._default_value_mapping_read[mapped_attribute] - - if mapped_attribute in self._fill_attributes_in: - recv_results = [ - [x] * self._fill_attributes_in[mapped_attribute] for x in recv_results - ] - - if mapped_attribute in self._reshape_attributes_in: - recv_results = numpy.reshape( - recv_results, - (self._number_of_receivers,) - + self._reshape_attributes_in[mapped_attribute], - ) - - return self._mapped_r_values( - recv_results, default_values, self._value_mapper[mapped_attribute] - ) - - def map_write(self, mapped_attribute: str, set_values: List[any]) -> List[any]: - """Perform a mapped write for the attribute using the set_values - - :param mapped_attribute: attribute identifier as present in - py:attribute:`~_default_value_mapping_write` - :param set_values: The values to be set for the specified attribute - :return: set_values as mapped given attribute dimensions and control mapping - """ - - default_values = self._masked_value_mapping_write[mapped_attribute] - - mapped_values = self._mapped_rw_values( - set_values, default_values, self._value_mapper[mapped_attribute] - ) - - if mapped_attribute in self._empty_attributes_out: - mapped_values = numpy.reshape( - mapped_values, - (self._number_of_receivers,) - + self._empty_attributes_out[mapped_attribute], - ) - # Remove last (duplicated) dimension - mapped_values = mapped_values[:, :, 0] - - if mapped_attribute in self._reshape_attributes_out: - mapped_values = numpy.reshape( - mapped_values, - (self._number_of_receivers,) - + self._reshape_attributes_out[mapped_attribute], - ) - - return mapped_values - - def _mapped_r_values( - self, - recv_results: List[any], - default_values: List[any], - value_mapping: List[any], - ): - """Mapping for read using :py:attribute:`~_control_mapping` and shallow copy""" - - mapped_values = numpy.array(default_values) - - # If mapping is based on only one map, then insert the value - # provided by the map - if len(value_mapping) == 1: - mapping = value_mapping[0] - for idx, (recv, rcu) in enumerate(mapping): - if recv > 0: - mapped_values[idx] = recv_results[recv - 1][rcu] - - # If mapping is based on both power and control maps - if len(value_mapping) == 2: - # Assuming mapper lists are always in the following order: - # [Power_Mapping, Control_Mapping] - [power_mapping, control_mapping] = value_mapping - for idx, ( - (recv_power, rcu_power), - (recv_control, rcu_control), - ) in enumerate(zip(power_mapping, control_mapping)): - # Insert the two values in the mapped array - # as (rcu_power_val, rcu_control_val) - if recv_power > 0: - mapped_values[idx][0] = recv_results[recv_power - 1][rcu_power] - if recv_control > 0: - mapped_values[idx][1] = recv_results[recv_control - 1][rcu_control] - return mapped_values - - def _mapped_rw_values( - self, set_values: List[any], default_values: List[any], value_mapping - ): - """Mapping for write using :py:attribute:`~_control_mapping` and shallow copy""" - - mapped_values = [] - - for _ in range(self._number_of_receivers): - mapped_values.append(default_values) - mapped_values = numpy.array(mapped_values) - - # If mapping is based on only one map, then insert the value - # provided by the map - if len(value_mapping) == 1: - mapping = value_mapping[0] - for idx, (recv, rcu) in enumerate(mapping): - if recv > 0: - mapped_values[recv - 1, rcu] = set_values[idx] - - # If mapping is based on both power and control maps - if len(value_mapping) == 2: - # Assuming mapper lists are always in the following order: - # [Power_Mapping, Control_Mapping] - [power_mapping, control_mapping] = value_mapping - for idx, ( - (recv_power, rcu_power), - (recv_control, rcu_control), - ) in enumerate(zip(power_mapping, control_mapping)): - if recv_power > 0: - mapped_values[recv_power - 1, rcu_power] = set_values[idx][0] - if recv_control > 0: - mapped_values[recv_control - 1, rcu_control] = set_values[idx][1] - - return mapped_values - - # ---------- # Run server # ---------- diff --git a/tangostationcontrol/tangostationcontrol/devices/base_device_classes/mapper.py b/tangostationcontrol/tangostationcontrol/devices/base_device_classes/mapper.py new file mode 100644 index 000000000..efddb1bd2 --- /dev/null +++ b/tangostationcontrol/tangostationcontrol/devices/base_device_classes/mapper.py @@ -0,0 +1,412 @@ +# Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy) +# SPDX-License-Identifier: Apache-2.0 + +""" Mapper for TANGO devices that share a common attribute + +""" +from enum import Enum +from typing import List, Optional, Dict +import numpy + +# PyTango imports +from tango import AttrWriteType +from tango.server import attribute + +from tangostationcontrol.common.constants import ( + N_elements, + MAX_ANTENNA, + N_pol, + N_rcu, + N_rcu_inp, + N_pn, +) +from tangostationcontrol.common.type_checking import type_not_sequence +from tangostationcontrol.devices.recv.recvh import RECVH +from tangostationcontrol.devices.sdp.sdp import SDP + +__all__ = ["AntennaMapper", "AntennaToRecvMapper", "AntennaToSdpMapper"] + + +class MappingKeys(Enum): + """Types of Mapping""" + + POWER = "POWER" + CONTROL = "CONTROL" + FPGA = "FPGA" + + +class MappedAttribute(attribute): + """An extension of Tango attribute""" + + def __init__( + self, + mapping_attribute, + dtype, + max_dim_x, + max_dim_y=0, + access=AttrWriteType.READ, + mapping_device="RECV", + **kwargs, + ): + if access == AttrWriteType.READ_WRITE: + + def write_func_wrapper(device, value): + cast_type = dtype + while not type_not_sequence(cast_type): + cast_type = cast_type[0] + write_func = device.set_mapped_attribute( + mapping_attribute, value, cast_type, mapping_device + ) + + self.fset = write_func_wrapper + + def read_func_wrapper(device): + return device.get_mapped_attribute(mapping_attribute, mapping_device) + + self.fget = read_func_wrapper + self.mapping_device = mapping_device + + super().__init__( + dtype=dtype, + max_dim_y=max_dim_y, + max_dim_x=max_dim_x, + access=access, + fisallowed="is_attribute_access_allowed", + **kwargs, + ) + + +class AntennaMapper: + """AntennaMapper superclass""" + + def __init__( + self, + mapping_dict: Dict[str, numpy.ndarray], + num_devices: Optional[int] = 1, + ) -> None: + self._mapping_dict = mapping_dict + self._num_devices = num_devices + self._value_mapper = {} + self._default_value_mapping_read = {} + self._masked_value_mapping_write = {} + self._reshape_attributes_in = {} + self._reshape_attributes_out = {} + self._fill_attributes_in = {} + self._empty_attributes_out = {} + + def map_read(self, mapped_attribute: str, device_values: List[any]) -> List[any]: + """Perform a mapped read for the attribute using the device_results + + :param mapped_attribute: attribute identifier as present in + py:attribute:`~_default_value_mapping_read` + :param device_values: Results as gathered by appending attributes from device + :return: device_results as mapped given attribute dimensions and control mapping + """ + default_values = self._default_value_mapping_read[mapped_attribute] + + # Attributes which need to be reshaped with a copy of their values, + # because Device original dimension < AntennaField mapped dimension + if mapped_attribute in self._fill_attributes_in: + device_values = [ + [x] * self._fill_attributes_in[mapped_attribute] for x in device_values + ] + + if mapped_attribute in self._reshape_attributes_in: + device_values = numpy.reshape( + device_values, + (self._num_devices,) + self._reshape_attributes_in[mapped_attribute], + ) + + return self._mapped_r_values( + device_values, default_values, self._value_mapper[mapped_attribute] + ) + + def map_write(self, mapped_attribute: str, set_values: List[any]) -> List[any]: + """Perform a mapped write for the attribute using the set_values + + :param mapped_attribute: attribute identifier as present in + py:attribute:`~_default_value_mapping_write` + :param set_values: The values to be set for the specified attribute + :return: set_values as mapped given attribute dimensions and control mapping + """ + default_values = self._masked_value_mapping_write[mapped_attribute] + + mapped_values = self._mapped_rw_values( + set_values, default_values, self._value_mapper[mapped_attribute] + ) + + # Attributes which need to be reshaped with removing a part of their duplicated + # values because Device original dimension < Antennafield mapped dimension + if mapped_attribute in self._empty_attributes_out: + mapped_values = numpy.reshape( + mapped_values, + (self._num_devices,) + self._empty_attributes_out[mapped_attribute], + ) + # Remove second (last) dimension + mapped_values = mapped_values[:, :, 0] + + # Only RECV mapping may have more than one devices by now + if mapped_attribute in self._reshape_attributes_out: + mapped_values = numpy.reshape( + mapped_values, + (self._num_devices,) + self._reshape_attributes_out[mapped_attribute], + ) + + return mapped_values + + def _mapped_r_values( + self, + device_values: List[any], + default_values: List[any], + value_mapping: List[any], + ): + """Mapping for read operation""" + mapped_values = numpy.array(default_values) + + # If mapping is based on only one map, then insert the value + # provided by the map + if len(value_mapping) == 1: + mapping = value_mapping[0] + for idx, (dev_conn, dev_input) in enumerate(mapping): + # Differentiate operations between RECVs and FPGAs + if dev_conn > 0 and MappingKeys.CONTROL in list( + self._mapping_dict.keys() + ): + mapped_values[idx] = device_values[dev_conn - 1][dev_input] + elif dev_input > -1 and MappingKeys.FPGA in list( + self._mapping_dict.keys() + ): + mapped_values[idx] = device_values[dev_conn] + + # If mapping is based on 2 maps + # f.e. power and control maps in AntennaToRecvMapper + if len(value_mapping) == 2: + # Assuming mapper lists are always in the following order: + # [Power_Mapping, Control_Mapping] + [power_mapping, control_mapping] = value_mapping + for idx, ( + (dev_power, dev_input_power), + (dev_control, dev_input_control), + ) in enumerate(zip(power_mapping, control_mapping)): + # Insert the two values in the mapped array + if dev_power > 0: + mapped_values[idx][0] = device_values[dev_power - 1][ + dev_input_power + ] + if dev_control > 0: + mapped_values[idx][1] = device_values[dev_control - 1][ + dev_input_control + ] + + return mapped_values + + def _mapped_rw_values( + self, + set_values: List[any], + default_values: List[any], + value_mapping: List[any], + ): + if MappingKeys.CONTROL in list(self._mapping_dict.keys()): + mapped_values = [] + for _ in range(self._num_devices): + mapped_values.append(default_values) + mapped_values = numpy.array(mapped_values) + elif MappingKeys.FPGA in list(self._mapping_dict.keys()): + mapped_values = numpy.array(default_values) + + if len(value_mapping) == 1: + mapping = value_mapping[0] + for idx, (dev_conn, dev_input) in enumerate(mapping): + # Differentiate operations between RECVs and FPGAs + if dev_conn > 0 and MappingKeys.CONTROL in list( + self._mapping_dict.keys() + ): + mapped_values[dev_conn - 1, dev_input] = set_values[idx] + elif dev_input > -1 and MappingKeys.FPGA in list( + self._mapping_dict.keys() + ): + mapped_values[dev_conn] = set_values[idx] + + if len(value_mapping) == 2: + [power_mapping, control_mapping] = value_mapping + for idx, ( + (dev_power, dev_input_power), + (dev_control, dev_input_control), + ) in enumerate(zip(power_mapping, control_mapping)): + if dev_power > 0: + mapped_values[dev_power - 1, dev_input_power] = set_values[idx][0] + if dev_control > 0: + mapped_values[dev_control - 1, dev_input_control] = set_values[idx][ + 1 + ] + + return mapped_values + + def _init_reshape_out(self, mapped_attributes: List[str], device_type: str): + """Initialise the _reshape_attributes_out dictionary, setting the + original dimension for each mapped attribute""" + device_class_map = {"RECV": RECVH, "SDP": SDP} + device_class = device_class_map[device_type] + reshape_out = {} + for attr in mapped_attributes: + # Retrieve the original attribute dimension from class attributes + attr_instance = getattr(device_class, attr) + dim_x, dim_y = attr_instance.dim_x, attr_instance.dim_y + # Insert the dimension(s) in the map + reshape_out[attr] = (dim_x,) if dim_y == 0 else (dim_y, dim_x) + return reshape_out + + +class AntennaToSdpMapper(AntennaMapper): + """Class that sets a mapping between SDP and AntennaField attributes""" + + _VALUE_MAP_NONE_16 = numpy.full(N_pn, None) + + def __init__(self, mapping_dict, mapped_dimensions) -> None: + super().__init__(mapping_dict) + + self._fpga_mapping = self._mapping_dict[MappingKeys.FPGA] + number_of_antennas = len(self._fpga_mapping) + value_map_fpga_info_int = numpy.zeros(number_of_antennas, dtype=numpy.uint32) + + self._mapped_attributes = list(mapped_dimensions) + + self._value_mapper = {} + for attr in self._mapped_attributes: + self._value_mapper[attr] = [self._fpga_mapping] + + self._default_value_mapping_read = {} + for attr in self._mapped_attributes: + self._default_value_mapping_read[attr] = value_map_fpga_info_int + + self._masked_value_mapping_write = {} + for attr in self._mapped_attributes: + # Only read/write attributes + if attr.endswith("RW"): + self._masked_value_mapping_write[ + attr + ] = AntennaToSdpMapper._VALUE_MAP_NONE_16 + + +class AntennaToRecvMapper(AntennaMapper): + """Class that sets a mapping between RECVs and AntennaField attributes""" + + _VALUE_MAP_NONE_96 = numpy.full(MAX_ANTENNA, None) + _VALUE_MAP_NONE_96_32 = numpy.full((MAX_ANTENNA, N_elements * N_pol), None) + _VALUE_MAP_NONE_96_2 = numpy.full((MAX_ANTENNA, 2), None) + + def __init__(self, mapping_dict, mapped_dimensions, number_of_receivers): + super().__init__(mapping_dict, number_of_receivers) + + self._power_mapping = self._mapping_dict[MappingKeys.POWER] + self._control_mapping = self._mapping_dict[MappingKeys.CONTROL] + number_of_antennas = len(self._control_mapping) + + # Reduce memory footprint of default values by creating single instance of + # common fields + value_map_ant_32_int = numpy.zeros( + [number_of_antennas, N_elements * N_pol], dtype=numpy.int64 + ) + value_map_ant_32_bool = numpy.full( + (number_of_antennas, N_elements * N_pol), False + ) + value_map_ant_bool = numpy.full(number_of_antennas, False) + + self._mapped_attributes = list(mapped_dimensions) + + self._value_mapper = self._init_value_mapper() + + self._default_value_mapping_read = { + "ANT_mask_RW": value_map_ant_bool, + "HBAT_BF_delay_steps_R": value_map_ant_32_int, + "HBAT_BF_delay_steps_RW": value_map_ant_32_int, + "HBAT_LED_on_R": value_map_ant_32_bool, + "HBAT_LED_on_RW": value_map_ant_32_bool, + "HBAT_PWR_LNA_on_R": value_map_ant_32_bool, + "HBAT_PWR_LNA_on_RW": value_map_ant_32_bool, + "HBAT_PWR_on_R": value_map_ant_32_bool, + "HBAT_PWR_on_RW": value_map_ant_32_bool, + "RCU_PWR_ANT_on_R": value_map_ant_bool, + "RCU_PWR_ANT_on_RW": value_map_ant_bool, + "RCU_band_select_R": numpy.zeros(number_of_antennas, dtype=numpy.int64), + "RCU_band_select_RW": numpy.zeros(number_of_antennas, dtype=numpy.int64), + "RCU_attenuator_dB_R": numpy.zeros( + (number_of_antennas, N_pol), dtype=numpy.int64 + ), + "RCU_attenuator_dB_RW": numpy.zeros( + (number_of_antennas, N_pol), dtype=numpy.int64 + ), + "RCU_DTH_freq_R": numpy.zeros(number_of_antennas, dtype=numpy.float64), + "RCU_DTH_freq_RW": numpy.zeros(number_of_antennas, dtype=numpy.float64), + "RCU_DTH_on_R": value_map_ant_bool, + "RCU_DTH_PWR_R": numpy.zeros(number_of_antennas, dtype=numpy.float64), + "RCU_DTH_PWR_RW": numpy.zeros(number_of_antennas, dtype=numpy.float64), + "RCU_DAB_filter_on_R": value_map_ant_bool, + "RCU_DAB_filter_on_RW": value_map_ant_bool, + "RCU_PCB_ID_R": numpy.zeros((number_of_antennas, 2), dtype=numpy.int64), + "RCU_PCB_version_R": numpy.full((number_of_antennas, 2), "", dtype=str), + } + self._masked_value_mapping_write = self._init_masked_value_write( + mapped_dimensions + ) + + # Dict of reshaped mapped attribute dimensions with special cases + self._reshape_attributes_in = mapped_dimensions + self._reshape_attributes_in["RCU_attenuator_dB_R"] = ((MAX_ANTENNA * N_pol),) + self._reshape_attributes_in["RCU_attenuator_dB_RW"] = ((MAX_ANTENNA * N_pol),) + self._reshape_attributes_in["RCU_PCB_ID_R"] = (MAX_ANTENNA,) + self._reshape_attributes_in["RCU_PCB_version_R"] = (MAX_ANTENNA,) + + # Dict of reshaped attribute dimensions following its device + self._reshape_attributes_out = self._init_reshape_out( + self._mapped_attributes, + "RECV", + ) + + # Attributes which need to be reshaped with a copy of their values, + # because RECV original dimension < AntennaField mapped dimension + self._fill_attributes_in = { + "RCU_attenuator_dB_R": N_pol, + "RCU_attenuator_dB_RW": N_pol, + "RCU_PCB_ID_R": N_rcu_inp, + "RCU_PCB_version_R": N_rcu_inp, + } + + # Attributes which need to be reshaped with removing a part of their duplicated + # values because RECV original dimension < Antennafield mapped dimension + self._empty_attributes_out = { + "RCU_attenuator_dB_RW": (N_rcu * N_rcu_inp, N_pol) + } + + def _init_masked_value_write(self, mapped_dimensions: dict): + """Create the masked value mapping dictionary for write attributes""" + special_cases = ["RCU_PCB_ID_R", "RCU_PCB_version_R"] + masked_write = {} + for attr, dim in mapped_dimensions.items(): + if attr.casefold().endswith("rw") or attr in special_cases: + if len(dim) == 1 and dim[0] == 96: + masked_write[attr] = AntennaToRecvMapper._VALUE_MAP_NONE_96 + elif len(dim) == 2 and dim[1] == 2: + masked_write[attr] = AntennaToRecvMapper._VALUE_MAP_NONE_96_2 + elif len(dim) == 2 and dim[1] == 32: + masked_write[attr] = AntennaToRecvMapper._VALUE_MAP_NONE_96_32 + return masked_write + + def _init_value_mapper(self): + """Create the value mapping dictionary""" + value_mapper = {} + double_mapping = [ + "RCU_attenuator_dB_R", + "RCU_attenuator_dB_RW", + "RCU_PCB_ID_R", + "RCU_PCB_version_R", + ] + pwr_mapping = ["RCU_PWR_ANT_on_R", "RCU_PWR_ANT_on_RW"] + for attr in self._mapped_attributes: + if attr in double_mapping: + value_mapper[attr] = [self._power_mapping, self._control_mapping] + elif attr in pwr_mapping: + value_mapper[attr] = [self._power_mapping] + else: + value_mapper[attr] = [self._control_mapping] + return value_mapper diff --git a/tangostationcontrol/test/devices/test_antennafield_device.py b/tangostationcontrol/test/devices/test_antennafield_device.py index 4b60b813e..6b0edbc15 100644 --- a/tangostationcontrol/test/devices/test_antennafield_device.py +++ b/tangostationcontrol/test/devices/test_antennafield_device.py @@ -7,6 +7,8 @@ import logging import unittest from unittest import mock +from test import base +from test.devices import device_base import numpy from tango.test_context import DeviceTestContext @@ -20,34 +22,24 @@ from tangostationcontrol.common.constants import ( ) from tangostationcontrol.devices import antennafield from tangostationcontrol.devices.antennafield import ( - AntennaToRecvMapper, - AntennaToSdpMapper, AntennaQuality, AntennaUse, + AntennaField, +) +from tangostationcontrol.devices.base_device_classes.mapper import ( + MappingKeys, + AntennaToRecvMapper, + AntennaToSdpMapper, ) -from test import base -from test.devices import device_base logger = logging.getLogger() +SDP_MAPPED_ATTRS = AntennaField.get_mapped_dimensions("SDP") +RECV_MAPPED_ATTRS = AntennaField.get_mapped_dimensions("RECV") -class TestAntennaToRecvMapper(base.TestCase): - # A mapping where Antennas are all not mapped to power RCUs - POWER_NOT_CONNECTED = [[-1, -1]] * DEFAULT_N_HBA_TILES - # A mapping where Antennas are all not mapped to control RCUs - CONTROL_NOT_CONNECTED = [[-1, -1]] * DEFAULT_N_HBA_TILES - # A mapping where first two Antennas are mapped on the first Receiver. - # The first Antenna control line on RCU 1 and the second Antenna control line - # on RCU 0. - POWER_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1 = [[1, 1], [1, 0]] + [[-1, -1]] * ( - DEFAULT_N_HBA_TILES - 2 - ) - CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1 = [[1, 1], [1, 0]] + [[-1, -1]] * ( - DEFAULT_N_HBA_TILES - 2 - ) - CONTROL_HBA_0_AND_1_ON_RCU_3_AND_2_OF_RECV_1 = [[1, 3], [1, 2]] + [[-1, -1]] * ( - DEFAULT_N_HBA_TILES - 2 - ) + +class TestAntennaToSdpMapper(base.TestCase): + """Test class for AntennaToSDPMapper""" # A mapping where Antennas are all not mapped to SDP FPGAs FPGA_NOT_CONNECTED = numpy.reshape( @@ -60,16 +52,20 @@ class TestAntennaToRecvMapper(base.TestCase): ) def test_fpga_sdp_info_observation_id_no_mapping(self): - fpga_mapping = numpy.reshape(self.FPGA_NOT_CONNECTED, (-1, 2)).tolist() - mapper = AntennaToSdpMapper(fpga_mapping) + fpga_mapping = { + MappingKeys.FPGA: numpy.reshape(self.FPGA_NOT_CONNECTED, (-1, 2)).tolist() + } + mapper = AntennaToSdpMapper(fpga_mapping, SDP_MAPPED_ATTRS) sdp_values = list(range(1, N_pn + 1)) expected = [0] * DEFAULT_N_HBA_TILES actual = mapper.map_read("FPGA_sdp_info_observation_id_R", sdp_values) numpy.testing.assert_equal(expected, actual) def test_fpga_sdp_info_antenna_band_index_no_mapping(self): - fpga_mapping = numpy.reshape(self.FPGA_NOT_CONNECTED, (-1, 2)).tolist() - mapper = AntennaToSdpMapper(fpga_mapping) + fpga_mapping = { + MappingKeys.FPGA: numpy.reshape(self.FPGA_NOT_CONNECTED, (-1, 2)).tolist() + } + mapper = AntennaToSdpMapper(fpga_mapping, SDP_MAPPED_ATTRS) # all HBAs sdp_values = [1] * N_pn # expected zeros because not connected @@ -78,31 +74,106 @@ class TestAntennaToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_fpga_sdp_info_observation_id_HBA_0_AND_1_ON_FPGA_1_AND_0(self): - fpga_mapping = numpy.reshape( - self.FPGA_HBA_0_AND_1_ON_FPGA_1_AND_0, (-1, 2) - ).tolist() - mapper = AntennaToSdpMapper(fpga_mapping) + fpga_mapping = { + MappingKeys.FPGA: numpy.reshape( + self.FPGA_HBA_0_AND_1_ON_FPGA_1_AND_0, (-1, 2) + ).tolist() + } + mapper = AntennaToSdpMapper(fpga_mapping, SDP_MAPPED_ATTRS) sdp_values = list(range(1, N_pn + 1)) # [1,2,3,...] (12,) expected = [2] + [1] + [0] * (DEFAULT_N_HBA_TILES - 2) actual = mapper.map_read("FPGA_sdp_info_observation_id_R", sdp_values) numpy.testing.assert_equal(expected, actual) def test_fpga_sdp_info_antenna_band_index_HBA_0_AND_1_ON_FPGA_1_AND_0(self): - fpga_mapping = numpy.reshape( - self.FPGA_HBA_0_AND_1_ON_FPGA_1_AND_0, (-1, 2) - ).tolist() - mapper = AntennaToSdpMapper(fpga_mapping) + fpga_mapping = { + MappingKeys.FPGA: numpy.reshape( + self.FPGA_HBA_0_AND_1_ON_FPGA_1_AND_0, (-1, 2) + ).tolist() + } + mapper = AntennaToSdpMapper(fpga_mapping, SDP_MAPPED_ATTRS) # all HBAs sdp_values = [0] + [1] + [1] * (N_pn - 2) # expected zeros because not connected expected = [1] + [0] + [0] * (DEFAULT_N_HBA_TILES - 2) actual = mapper.map_read("FPGA_sdp_info_antenna_band_index_R", sdp_values) + numpy.testing.assert_equal(actual, expected) + + def test_map_write_fpga_sdp_info_observation_id_no_mapping(self): + """Verify results None without fpga connection and array sizes""" + fpga_mapping = { + MappingKeys.FPGA: numpy.reshape(self.FPGA_NOT_CONNECTED, (-1, 2)).tolist() + } + mapper = AntennaToSdpMapper(fpga_mapping, SDP_MAPPED_ATTRS) + set_values = [None] * DEFAULT_N_HBA_TILES + expected = [None] * N_pn + actual = mapper.map_write("FPGA_sdp_info_observation_id_RW", set_values) + numpy.testing.assert_equal(expected, actual) + + def test_map_write_fpga_sdp_info_observation_id_hba_0_and_1_on_fpga_1_and_0(self): + """Verify results fpga connections and array sizes""" + fpga_mapping = { + MappingKeys.FPGA: numpy.reshape( + self.FPGA_HBA_0_AND_1_ON_FPGA_1_AND_0, (-1, 2) + ).tolist() + } + mapper = AntennaToSdpMapper(fpga_mapping, SDP_MAPPED_ATTRS) + set_values = [2] + [1] + [-1] * (DEFAULT_N_HBA_TILES - 2) + expected = [1] + [2] + [None] * (N_pn - 2) + actual = mapper.map_write("FPGA_sdp_info_observation_id_RW", set_values) + numpy.testing.assert_equal(expected, actual) + + def test_map_write_fpga_sdp_info_antenna_band_index_no_mapping(self): + """Verify results None without fpga connection and array sizes""" + fpga_mapping = { + MappingKeys.FPGA: numpy.reshape(self.FPGA_NOT_CONNECTED, (-1, 2)).tolist() + } + mapper = AntennaToSdpMapper(fpga_mapping, SDP_MAPPED_ATTRS) + set_values = [None] * DEFAULT_N_HBA_TILES + expected = [None] * N_pn + actual = mapper.map_write("FPGA_sdp_info_antenna_band_index_RW", set_values) + numpy.testing.assert_equal(expected, actual) + + def test_map_write_fpga_sdp_info_antenna_band_index_hba_0_and_1_on_fpga_1_and_0( + self, + ): + """Verify results fpga connections and array sizes""" + fpga_mapping = { + MappingKeys.FPGA: numpy.reshape( + self.FPGA_HBA_0_AND_1_ON_FPGA_1_AND_0, (-1, 2) + ).tolist() + } + mapper = AntennaToSdpMapper(fpga_mapping, SDP_MAPPED_ATTRS) + set_values = [1] + [0] + [1] * (DEFAULT_N_HBA_TILES - 2) + expected = [0] + [1] + [None] * (N_pn - 2) + actual = mapper.map_write("FPGA_sdp_info_antenna_band_index_RW", set_values) numpy.testing.assert_equal(expected, actual) + +class TestAntennaToRecvMapper(base.TestCase): + # A mapping where Antennas are all not mapped to power RCUs + POWER_NOT_CONNECTED = [[-1, -1]] * DEFAULT_N_HBA_TILES + # A mapping where Antennas are all not mapped to control RCUs + CONTROL_NOT_CONNECTED = [[-1, -1]] * DEFAULT_N_HBA_TILES + # A mapping where first two Antennas are mapped on the first Receiver. + # The first Antenna control line on RCU 1 and the second Antenna control line + # on RCU 0. + POWER_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1 = [[1, 1], [1, 0]] + [[-1, -1]] * ( + DEFAULT_N_HBA_TILES - 2 + ) + CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1 = [[1, 1], [1, 0]] + [[-1, -1]] * ( + DEFAULT_N_HBA_TILES - 2 + ) + CONTROL_HBA_0_AND_1_ON_RCU_3_AND_2_OF_RECV_1 = [[1, 3], [1, 2]] + [[-1, -1]] * ( + DEFAULT_N_HBA_TILES - 2 + ) + def test_ant_read_mask_r_no_mapping(self): - mapper = AntennaToRecvMapper( - self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3 - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) receiver_values = [ [False] * MAX_ANTENNA, @@ -114,11 +185,11 @@ class TestAntennaToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_ant_read_mask_r_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): - mapper = AntennaToRecvMapper( - self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, - self.POWER_NOT_CONNECTED, - 3, - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) receiver_values = [ [False, True, False] + [False, False, False] * (N_rcu - 1), @@ -131,18 +202,22 @@ class TestAntennaToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_rcu_band_select_no_mapping(self): - mapper = AntennaToRecvMapper( - self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3 - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) receiver_values = [[0] * MAX_ANTENNA, [0] * MAX_ANTENNA, [0] * MAX_ANTENNA] expected = [0] * DEFAULT_N_HBA_TILES actual = mapper.map_read("RCU_band_select_RW", receiver_values) numpy.testing.assert_equal(expected, actual) def test_bf_read_delay_steps_r_no_mapping(self): - mapper = AntennaToRecvMapper( - self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3 - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) receiver_values = [ [[0] * N_rcu] * MAX_ANTENNA, @@ -154,11 +229,11 @@ class TestAntennaToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_bf_read_delay_steps_r_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): - mapper = AntennaToRecvMapper( - self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, - self.POWER_NOT_CONNECTED, - 3, - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) receiver_values = [ [[2] * N_rcu, [1] * N_rcu] + [[0] * N_rcu] * (MAX_ANTENNA - 2), @@ -173,9 +248,11 @@ class TestAntennaToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_bf_read_delay_steps_rw_no_mapping(self): - mapper = AntennaToRecvMapper( - self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3 - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) receiver_values = [ [[0] * N_rcu] * MAX_ANTENNA, @@ -187,11 +264,11 @@ class TestAntennaToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_bf_read_delay_steps_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): - mapper = AntennaToRecvMapper( - self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, - self.POWER_NOT_CONNECTED, - 3, - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) receiver_values = [ [[2] * N_rcu, [1] * N_rcu] + [[0] * N_rcu] * (MAX_ANTENNA - 2), @@ -206,9 +283,11 @@ class TestAntennaToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_read_led_on_r_unmapped(self): - mapper = AntennaToRecvMapper( - self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3 - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) receiver_values = [ [[False] * N_rcu] * MAX_ANTENNA, @@ -220,11 +299,11 @@ class TestAntennaToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_read_led_on_r_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): - mapper = AntennaToRecvMapper( - self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, - self.POWER_NOT_CONNECTED, - 3, - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) receiver_values = [ [[False, True] * 16, [True, False] * 16] @@ -240,9 +319,11 @@ class TestAntennaToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_read_led_on_rw_unmapped(self): - mapper = AntennaToRecvMapper( - self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3 - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) receiver_values = [ [[False] * N_rcu] * MAX_ANTENNA, @@ -254,11 +335,11 @@ class TestAntennaToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_read_led_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): - mapper = AntennaToRecvMapper( - self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, - self.POWER_NOT_CONNECTED, - 3, - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) receiver_values = [ [[False, True] * 16, [True, False] * 16] @@ -274,9 +355,11 @@ class TestAntennaToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_read_pwr_lna_on_r_unmapped(self): - mapper = AntennaToRecvMapper( - self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3 - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) receiver_values = [ [[False] * N_rcu] * MAX_ANTENNA, @@ -288,11 +371,11 @@ class TestAntennaToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_read_pwr_lna_on_r_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): - mapper = AntennaToRecvMapper( - self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, - self.POWER_NOT_CONNECTED, - 3, - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) receiver_values = [ [[False, True] * 16, [True, False] * 16] @@ -308,9 +391,11 @@ class TestAntennaToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_read_pwr_lna_on_rw_unmapped(self): - mapper = AntennaToRecvMapper( - self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3 - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) receiver_values = [ [[False] * N_rcu] * MAX_ANTENNA, @@ -322,11 +407,11 @@ class TestAntennaToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_read_pwr_lna_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): - mapper = AntennaToRecvMapper( - self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, - self.POWER_NOT_CONNECTED, - 3, - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) receiver_values = [ [[False, True] * 16, [True, False] * 16] @@ -342,9 +427,11 @@ class TestAntennaToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_read_pwr_on_r_unmapped(self): - mapper = AntennaToRecvMapper( - self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3 - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) receiver_values = [ [[False] * N_rcu] * MAX_ANTENNA, @@ -356,11 +443,11 @@ class TestAntennaToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_read_pwr_on_r_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): - mapper = AntennaToRecvMapper( - self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, - self.POWER_NOT_CONNECTED, - 3, - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) receiver_values = [ [[False, True] * 16, [True, False] * 16] @@ -376,9 +463,11 @@ class TestAntennaToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_read_pwr_on_rw_unmapped(self): - mapper = AntennaToRecvMapper( - self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3 - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) receiver_values = [ [[False] * N_rcu] * MAX_ANTENNA, @@ -390,11 +479,11 @@ class TestAntennaToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_read_pwr_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): - mapper = AntennaToRecvMapper( - self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, - self.POWER_NOT_CONNECTED, - 3, - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) receiver_values = [ [[False, True] * 16, [True, False] * 16] @@ -411,9 +500,11 @@ class TestAntennaToRecvMapper(base.TestCase): def test_map_read_rcu_id_unmapped(self): """Test whether RCU_PCB_ID_R is correctly read with no mapping""" - mapper = AntennaToRecvMapper( - self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3 - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) receiver_values = [list(range(32)), [0] * 32, [0] * 32] expected = [[0] * 2] * DEFAULT_N_HBA_TILES actual = mapper.map_read("RCU_PCB_ID_R", receiver_values) @@ -422,11 +513,11 @@ class TestAntennaToRecvMapper(base.TestCase): def test_map_read_rcu_id_control_connected_and_power_disconnected(self): """Test whether RCU_PCB_ID_R is correctly read with control mapping and no power mapping""" - mapper = AntennaToRecvMapper( - self.CONTROL_HBA_0_AND_1_ON_RCU_3_AND_2_OF_RECV_1, - self.POWER_NOT_CONNECTED, - 3, - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_HBA_0_AND_1_ON_RCU_3_AND_2_OF_RECV_1, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) receiver_values = [list(range(32)), [0] * 32, [0] * 32] expected = [[0, 3], [0, 2]] + [[0, 0]] * (DEFAULT_N_HBA_TILES - 2) actual = mapper.map_read("RCU_PCB_ID_R", receiver_values) @@ -435,11 +526,11 @@ class TestAntennaToRecvMapper(base.TestCase): def test_map_read_rcu_id_control_disconnected_and_power_connected(self): """Test whether RCU_PCB_ID_R is correctly read with power mapping and no control mapping""" - mapper = AntennaToRecvMapper( - self.CONTROL_NOT_CONNECTED, - self.POWER_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, - 3, - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, + MappingKeys.POWER: self.POWER_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) receiver_values = [list(range(32)), [0] * 32, [0] * 32] expected = [[1, 0], [0, 0]] + [[0, 0]] * (DEFAULT_N_HBA_TILES - 2) actual = mapper.map_read("RCU_PCB_ID_R", receiver_values) @@ -447,12 +538,12 @@ class TestAntennaToRecvMapper(base.TestCase): def test_map_read_rcu_id_control_and_power_connected(self): """Test whether RCU_PCB_ID_R is correctly read with control mapping - and no power mapping""" - mapper = AntennaToRecvMapper( - self.CONTROL_HBA_0_AND_1_ON_RCU_3_AND_2_OF_RECV_1, - self.POWER_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, - 3, - ) + and power mapping""" + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_HBA_0_AND_1_ON_RCU_3_AND_2_OF_RECV_1, + MappingKeys.POWER: self.POWER_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) receiver_values = [list(range(32)), [0] * 32, [0] * 32] expected = [[1, 3], [0, 2]] + [[0, 0]] * (DEFAULT_N_HBA_TILES - 2) actual = mapper.map_read("RCU_PCB_ID_R", receiver_values) @@ -460,9 +551,11 @@ class TestAntennaToRecvMapper(base.TestCase): def test_map_read_rcu_attenuator_unmapped(self): """Test whether RCU_attenuator_dB_R is correctly read with no mapping""" - mapper = AntennaToRecvMapper( - self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3 - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 3) receiver_values = [ list(range(MAX_ANTENNA)), [0] * MAX_ANTENNA, @@ -475,11 +568,11 @@ class TestAntennaToRecvMapper(base.TestCase): def test_map_read_rcu_attenuator_control_connected_and_power_disconnected(self): """Test whether RCU_attenuator_dB_R is correctly read with control mapping and no power mapping""" - mapper = AntennaToRecvMapper( - self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, - self.POWER_NOT_CONNECTED, - 1, - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 1) receiver_values = [list(range(MAX_ANTENNA))] expected = [[0, 1], [0, 0]] + [[0, 0]] * (DEFAULT_N_HBA_TILES - 2) actual = mapper.map_read("RCU_attenuator_dB_R", receiver_values) @@ -488,11 +581,11 @@ class TestAntennaToRecvMapper(base.TestCase): def test_map_read_rcu_attenuator_control_disconnected_and_power_connected(self): """Test whether RCU_attenuator_dB_R is correctly read with power mapping and no control mapping""" - mapper = AntennaToRecvMapper( - self.CONTROL_NOT_CONNECTED, - self.POWER_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, - 1, - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, + MappingKeys.POWER: self.POWER_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 1) receiver_values = [list(range(MAX_ANTENNA))] expected = [[1, 0], [0, 0]] + [[0, 0]] * (DEFAULT_N_HBA_TILES - 2) actual = mapper.map_read("RCU_attenuator_dB_R", receiver_values) @@ -501,11 +594,11 @@ class TestAntennaToRecvMapper(base.TestCase): def test_map_read_rcu_attenuator_control_and_power_connected(self): """Test whether RCU_attenuator_dB_R is correctly read with both control mapping and power mapping""" - mapper = AntennaToRecvMapper( - self.CONTROL_HBA_0_AND_1_ON_RCU_3_AND_2_OF_RECV_1, - self.POWER_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, - 1, - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_HBA_0_AND_1_ON_RCU_3_AND_2_OF_RECV_1, + MappingKeys.POWER: self.POWER_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 1) receiver_values = [list(range(MAX_ANTENNA))] expected = [[1, 3], [0, 2]] + [[0, 0]] * (DEFAULT_N_HBA_TILES - 2) actual = mapper.map_read("RCU_attenuator_dB_R", receiver_values) @@ -513,54 +606,13 @@ class TestAntennaToRecvMapper(base.TestCase): # Rename to write - def test_map_write_fpga_sdp_info_observation_id_no_mapping(self): - """Verify results None without fpga connection and array sizes""" - fpga_mapping = numpy.reshape(self.FPGA_NOT_CONNECTED, (-1, 2)).tolist() - mapper = AntennaToSdpMapper(fpga_mapping) - set_values = [None] * DEFAULT_N_HBA_TILES - expected = [None] * N_pn - actual = mapper.map_write("FPGA_sdp_info_observation_id_RW", set_values) - numpy.testing.assert_equal(expected, actual) - - def test_map_write_fpga_sdp_info_observation_id_hba_0_and_1_on_fpga_1_and_0(self): - """Verify results fpga connections and array sizes""" - fpga_mapping = numpy.reshape( - self.FPGA_HBA_0_AND_1_ON_FPGA_1_AND_0, (-1, 2) - ).tolist() - mapper = AntennaToSdpMapper(fpga_mapping) - set_values = [2] + [1] + [-1] * (DEFAULT_N_HBA_TILES - 2) - expected = [1] + [2] + [None] * (N_pn - 2) - actual = mapper.map_write("FPGA_sdp_info_observation_id_RW", set_values) - numpy.testing.assert_equal(expected, actual) - - def test_map_write_fpga_sdp_info_antenna_band_index_no_mapping(self): - """Verify results None without fpga connection and array sizes""" - fpga_mapping = numpy.reshape(self.FPGA_NOT_CONNECTED, (-1, 2)).tolist() - mapper = AntennaToSdpMapper(fpga_mapping) - set_values = [None] * DEFAULT_N_HBA_TILES - expected = [None] * N_pn - actual = mapper.map_write("FPGA_sdp_info_antenna_band_index_RW", set_values) - numpy.testing.assert_equal(expected, actual) - - def test_map_write_fpga_sdp_info_antenna_band_index_hba_0_and_1_on_fpga_1_and_0( - self, - ): - """Verify results fpga connections and array sizes""" - fpga_mapping = numpy.reshape( - self.FPGA_HBA_0_AND_1_ON_FPGA_1_AND_0, (-1, 2) - ).tolist() - mapper = AntennaToSdpMapper(fpga_mapping) - set_values = [1] + [0] + [1] * (DEFAULT_N_HBA_TILES - 2) - expected = [0] + [1] + [None] * (N_pn - 2) - actual = mapper.map_write("FPGA_sdp_info_antenna_band_index_RW", set_values) - numpy.testing.assert_equal(expected, actual) - def test_map_write_ant_mask_rw_no_mapping_and_one_receiver(self): """Verify results None without control and array sizes""" - - mapper = AntennaToRecvMapper( - self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1 - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 1) set_values = [None] * DEFAULT_N_HBA_TILES expected = [[[None, None, None]] * N_rcu] @@ -569,10 +621,11 @@ class TestAntennaToRecvMapper(base.TestCase): def test_map_write_ant_mask_rw_no_mapping_and_two_receivers(self): """Verify results None without control and array sizes""" - - mapper = AntennaToRecvMapper( - self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2 - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 2) set_values = [None] * DEFAULT_N_HBA_TILES expected = [[[None, None, None]] * N_rcu] * 2 @@ -580,11 +633,11 @@ class TestAntennaToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_write_ant_mask_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): - mapper = AntennaToRecvMapper( - self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, - self.POWER_NOT_CONNECTED, - 1, - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 1) set_values = [True, False] + [None] * (DEFAULT_N_HBA_TILES - 2) expected = [[[False, True, None]] + [[None, None, None]] * (N_rcu - 1)] @@ -592,9 +645,11 @@ class TestAntennaToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_write_rcu_pwr_ant_on_no_mapping_and_one_receiver(self): - mapper = AntennaToRecvMapper( - self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1 - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 1) set_values = [None] * DEFAULT_N_HBA_TILES expected = [[[None, None, None]] * N_rcu] @@ -602,9 +657,11 @@ class TestAntennaToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_write_rcu_pwr_ant_on_no_mapping_and_two_receivers(self): - mapper = AntennaToRecvMapper( - self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2 - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 2) set_values = [None] * DEFAULT_N_HBA_TILES expected = [[[None, None, None]] * N_rcu] * 2 @@ -612,11 +669,11 @@ class TestAntennaToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_write_rcu_pwr_ant_on_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): - mapper = AntennaToRecvMapper( - self.CONTROL_NOT_CONNECTED, - self.POWER_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, - 1, - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, + MappingKeys.POWER: self.POWER_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 1) set_values = [1, 0] + [None] * (DEFAULT_N_HBA_TILES - 2) expected = [[[0, 1, None]] + [[None, None, None]] * (N_rcu - 1)] @@ -624,9 +681,11 @@ class TestAntennaToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_write_rcu_band_select_no_mapping_and_one_receiver(self): - mapper = AntennaToRecvMapper( - self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1 - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 1) set_values = [None] * DEFAULT_N_HBA_TILES expected = [[[None, None, None]] * N_rcu] @@ -634,9 +693,11 @@ class TestAntennaToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_write_rcu_band_select_no_mapping_and_two_receivers(self): - mapper = AntennaToRecvMapper( - self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2 - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 2) set_values = [None] * DEFAULT_N_HBA_TILES expected = [[[None, None, None]] * N_rcu] * 2 @@ -644,11 +705,11 @@ class TestAntennaToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_write_rcu_band_select_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): - mapper = AntennaToRecvMapper( - self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, - self.POWER_NOT_CONNECTED, - 1, - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 1) set_values = [1, 0] + [None] * (DEFAULT_N_HBA_TILES - 2) expected = [[[0, 1, None]] + [[None, None, None]] * (N_rcu - 1)] @@ -656,9 +717,11 @@ class TestAntennaToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_write_bf_delay_steps_rw_no_mapping_and_one_receiver(self): - mapper = AntennaToRecvMapper( - self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1 - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 1) set_values = [[1] * N_rcu] * DEFAULT_N_HBA_TILES expected = [[[None] * N_rcu] * MAX_ANTENNA] @@ -666,9 +729,11 @@ class TestAntennaToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_write_bf_delay_steps_rw_no_mapping_and_two_receivers(self): - mapper = AntennaToRecvMapper( - self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2 - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 2) set_values = [[1] * N_rcu] * DEFAULT_N_HBA_TILES expected = [[[None] * N_rcu] * MAX_ANTENNA, [[None] * N_rcu] * MAX_ANTENNA] @@ -676,11 +741,11 @@ class TestAntennaToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_write_bf_delay_steps_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): - mapper = AntennaToRecvMapper( - self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, - self.POWER_NOT_CONNECTED, - 1, - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 1) set_values = [[1] * N_rcu, [2] * N_rcu] + [[None] * N_rcu] * ( DEFAULT_N_HBA_TILES - 2 @@ -690,9 +755,11 @@ class TestAntennaToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_write_led_on_rw_no_mapping_and_one_receiver(self): - mapper = AntennaToRecvMapper( - self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1 - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 1) set_values = [[None] * N_rcu] * DEFAULT_N_HBA_TILES expected = [[[None] * N_rcu] * MAX_ANTENNA] @@ -700,9 +767,11 @@ class TestAntennaToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_write_led_on_rw_no_mapping_and_two_receivers(self): - mapper = AntennaToRecvMapper( - self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2 - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 2) set_values = [[None] * N_rcu] * DEFAULT_N_HBA_TILES expected = [[[None] * N_rcu] * MAX_ANTENNA, [[None] * N_rcu] * MAX_ANTENNA] @@ -710,11 +779,11 @@ class TestAntennaToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_write_led_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): - mapper = AntennaToRecvMapper( - self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, - self.POWER_NOT_CONNECTED, - 1, - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 1) set_values = [[False, True] * 16, [True, False] * 16] + [[None] * N_rcu] * ( DEFAULT_N_HBA_TILES - 2 @@ -727,9 +796,11 @@ class TestAntennaToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_write_pwr_lna_on_rw_no_mapping_and_one_receiver(self): - mapper = AntennaToRecvMapper( - self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1 - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 1) set_values = [[None] * N_rcu] * DEFAULT_N_HBA_TILES expected = [[[None] * N_rcu] * MAX_ANTENNA] @@ -737,9 +808,11 @@ class TestAntennaToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_write_pwr_lna_on_rw_no_mapping_and_two_receivers(self): - mapper = AntennaToRecvMapper( - self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2 - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 2) set_values = [[None] * N_rcu] * DEFAULT_N_HBA_TILES expected = [[[None] * N_rcu] * MAX_ANTENNA, [[None] * N_rcu] * MAX_ANTENNA] @@ -747,11 +820,11 @@ class TestAntennaToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_write_pwr_lna_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): - mapper = AntennaToRecvMapper( - self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, - self.POWER_NOT_CONNECTED, - 1, - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 1) set_values = [[False, True] * 16, [True, False] * 16] + [[None] * N_rcu] * ( DEFAULT_N_HBA_TILES - 2 @@ -764,18 +837,22 @@ class TestAntennaToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_write_pwr_on_rw_no_mapping_and_one_receiver(self): - mapper = AntennaToRecvMapper( - self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1 - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 1) set_values = [[None] * N_rcu] * DEFAULT_N_HBA_TILES expected = [[[None] * N_rcu] * MAX_ANTENNA] actual = mapper.map_write("HBAT_PWR_on_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_lna_on_rw_no_mapping_and_two_receivers(self): - mapper = AntennaToRecvMapper( - self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2 - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 2) set_values = [[None] * N_rcu] * DEFAULT_N_HBA_TILES expected = [[[None] * N_rcu] * MAX_ANTENNA, [[None] * N_rcu] * MAX_ANTENNA] @@ -783,11 +860,11 @@ class TestAntennaToRecvMapper(base.TestCase): numpy.testing.assert_equal(expected, actual) def test_map_write_pwr_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): - mapper = AntennaToRecvMapper( - self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, - self.POWER_NOT_CONNECTED, - 1, - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 1) set_values = [[False, True] * 16, [True, False] * 16] + [[None] * N_rcu] * ( DEFAULT_N_HBA_TILES - 2 @@ -801,9 +878,11 @@ class TestAntennaToRecvMapper(base.TestCase): def test_map_write_rcu_attenuator_unmapped(self): """Test whether RCU_attenuator_dB_RW is correctly written with no mapping""" - mapper = AntennaToRecvMapper( - self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1 - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 1) set_values = [[1, 1]] * DEFAULT_N_HBA_TILES expected = [[[None] * N_rcu_inp] * N_rcu] @@ -814,11 +893,11 @@ class TestAntennaToRecvMapper(base.TestCase): def test_map_write_rcu_attenuator_control_connected_and_power_disconnected(self): """Test whether RCU_attenuator_dB_RW is correctly written with control mapping and no power mapping""" - mapper = AntennaToRecvMapper( - self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, - self.POWER_NOT_CONNECTED, - 1, - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, + MappingKeys.POWER: self.POWER_NOT_CONNECTED, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 1) set_values = [[1, 2], [3, 4]] + [[0, 0]] * (DEFAULT_N_HBA_TILES - 2) expected = [[[4, 2, None]] + [[None] * N_rcu_inp] * (N_rcu - 1)] @@ -829,11 +908,11 @@ class TestAntennaToRecvMapper(base.TestCase): def test_map_write_rcu_attenuator_control_disconnected_and_power_connected(self): """Test whether RCU_attenuator_dB_RW is correctly written with power mapping and no control mapping""" - mapper = AntennaToRecvMapper( - self.CONTROL_NOT_CONNECTED, - self.POWER_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, - 1, - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_NOT_CONNECTED, + MappingKeys.POWER: self.POWER_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 1) set_values = set_values = [[1, 2], [3, 4]] + [[0, 0]] * ( DEFAULT_N_HBA_TILES - 2 @@ -846,11 +925,11 @@ class TestAntennaToRecvMapper(base.TestCase): def test_map_write_rcu_attenuator_control_connected_and_power_connected(self): """Test whether RCU_attenuator_dB_RW is correctly written with power mapping and no control mapping""" - mapper = AntennaToRecvMapper( - self.CONTROL_HBA_0_AND_1_ON_RCU_3_AND_2_OF_RECV_1, - self.POWER_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, - 1, - ) + recv_mapping = { + MappingKeys.CONTROL: self.CONTROL_HBA_0_AND_1_ON_RCU_3_AND_2_OF_RECV_1, + MappingKeys.POWER: self.POWER_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, + } + mapper = AntennaToRecvMapper(recv_mapping, RECV_MAPPED_ATTRS, 1) set_values = set_values = [[1, 2], [3, 4]] + [[0, 0]] * ( DEFAULT_N_HBA_TILES - 2 -- GitLab