Skip to content
Snippets Groups Projects
Commit e6c3eedc authored by Stefano Di Frischia's avatar Stefano Di Frischia
Browse files

Merge branch 'L2SS-1218-refactor-antennamapper' into 'master'

Resolve L2SS-1218 "Refactor antennamapper"

Closes L2SS-1218 and L2SS-1426

See merge request !663
parents 3df8d2f6 2c725877
No related branches found
No related tags found
1 merge request!663Resolve L2SS-1218 "Refactor antennamapper"
...@@ -115,6 +115,8 @@ Next change the version in the following places: ...@@ -115,6 +115,8 @@ Next change the version in the following places:
# Release Notes # Release Notes
* 0.20.1 Create an abstract AntennaMapper class which implements behavior of both AntennaToSdpMapper
and AntennaToRecvMapper
* 0.20.0 Complete implementation of station-state transitions in StationManager device. * 0.20.0 Complete implementation of station-state transitions in StationManager device.
Unified power management under power_hardware_on/off(), dropping prepare_hardware(), Unified power management under power_hardware_on/off(), dropping prepare_hardware(),
disable_hardware(). disable_hardware().
......
0.20.0 0.20.1
# Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
""" Mapper for TANGO devices that share a common attribute
"""
from enum import Enum
from typing import List, Optional, Dict
import numpy
# PyTango imports
from tango import AttrWriteType
from tango.server import attribute
from tangostationcontrol.common.constants import (
N_elements,
MAX_ANTENNA,
N_pol,
N_rcu,
N_rcu_inp,
N_pn,
)
from tangostationcontrol.common.type_checking import type_not_sequence
from tangostationcontrol.devices.recv.recvh import RECVH
from tangostationcontrol.devices.sdp.sdp import SDP
__all__ = ["AntennaMapper", "AntennaToRecvMapper", "AntennaToSdpMapper"]
class MappingKeys(Enum):
"""Types of Mapping"""
POWER = "POWER"
CONTROL = "CONTROL"
FPGA = "FPGA"
class MappedAttribute(attribute):
"""An extension of Tango attribute"""
def __init__(
self,
mapping_attribute,
dtype,
max_dim_x,
max_dim_y=0,
access=AttrWriteType.READ,
mapping_device="RECV",
**kwargs,
):
if access == AttrWriteType.READ_WRITE:
def write_func_wrapper(device, value):
cast_type = dtype
while not type_not_sequence(cast_type):
cast_type = cast_type[0]
write_func = device.set_mapped_attribute(
mapping_attribute, value, cast_type, mapping_device
)
self.fset = write_func_wrapper
def read_func_wrapper(device):
return device.get_mapped_attribute(mapping_attribute, mapping_device)
self.fget = read_func_wrapper
self.mapping_device = mapping_device
super().__init__(
dtype=dtype,
max_dim_y=max_dim_y,
max_dim_x=max_dim_x,
access=access,
fisallowed="is_attribute_access_allowed",
**kwargs,
)
class AntennaMapper:
"""AntennaMapper superclass"""
def __init__(
self,
mapping_dict: Dict[str, numpy.ndarray],
num_devices: Optional[int] = 1,
) -> None:
self._mapping_dict = mapping_dict
self._num_devices = num_devices
self._value_mapper = {}
self._default_value_mapping_read = {}
self._masked_value_mapping_write = {}
self._reshape_attributes_in = {}
self._reshape_attributes_out = {}
self._fill_attributes_in = {}
self._empty_attributes_out = {}
def map_read(self, mapped_attribute: str, device_values: List[any]) -> List[any]:
"""Perform a mapped read for the attribute using the device_results
:param mapped_attribute: attribute identifier as present in
py:attribute:`~_default_value_mapping_read`
:param device_values: Results as gathered by appending attributes from device
:return: device_results as mapped given attribute dimensions and control mapping
"""
default_values = self._default_value_mapping_read[mapped_attribute]
# Attributes which need to be reshaped with a copy of their values,
# because Device original dimension < AntennaField mapped dimension
if mapped_attribute in self._fill_attributes_in:
device_values = [
[x] * self._fill_attributes_in[mapped_attribute] for x in device_values
]
if mapped_attribute in self._reshape_attributes_in:
device_values = numpy.reshape(
device_values,
(self._num_devices,) + self._reshape_attributes_in[mapped_attribute],
)
return self._mapped_r_values(
device_values, default_values, self._value_mapper[mapped_attribute]
)
def map_write(self, mapped_attribute: str, set_values: List[any]) -> List[any]:
"""Perform a mapped write for the attribute using the set_values
:param mapped_attribute: attribute identifier as present in
py:attribute:`~_default_value_mapping_write`
:param set_values: The values to be set for the specified attribute
:return: set_values as mapped given attribute dimensions and control mapping
"""
default_values = self._masked_value_mapping_write[mapped_attribute]
mapped_values = self._mapped_rw_values(
set_values, default_values, self._value_mapper[mapped_attribute]
)
# Attributes which need to be reshaped with removing a part of their duplicated
# values because Device original dimension < Antennafield mapped dimension
if mapped_attribute in self._empty_attributes_out:
mapped_values = numpy.reshape(
mapped_values,
(self._num_devices,) + self._empty_attributes_out[mapped_attribute],
)
# Remove second (last) dimension
mapped_values = mapped_values[:, :, 0]
# Only RECV mapping may have more than one devices by now
if mapped_attribute in self._reshape_attributes_out:
mapped_values = numpy.reshape(
mapped_values,
(self._num_devices,) + self._reshape_attributes_out[mapped_attribute],
)
return mapped_values
def _mapped_r_values(
self,
device_values: List[any],
default_values: List[any],
value_mapping: List[any],
):
"""Mapping for read operation"""
mapped_values = numpy.array(default_values)
# If mapping is based on only one map, then insert the value
# provided by the map
if len(value_mapping) == 1:
mapping = value_mapping[0]
for idx, (dev_conn, dev_input) in enumerate(mapping):
# Differentiate operations between RECVs and FPGAs
if dev_conn > 0 and MappingKeys.CONTROL in list(
self._mapping_dict.keys()
):
mapped_values[idx] = device_values[dev_conn - 1][dev_input]
elif dev_input > -1 and MappingKeys.FPGA in list(
self._mapping_dict.keys()
):
mapped_values[idx] = device_values[dev_conn]
# If mapping is based on 2 maps
# f.e. power and control maps in AntennaToRecvMapper
if len(value_mapping) == 2:
# Assuming mapper lists are always in the following order:
# [Power_Mapping, Control_Mapping]
[power_mapping, control_mapping] = value_mapping
for idx, (
(dev_power, dev_input_power),
(dev_control, dev_input_control),
) in enumerate(zip(power_mapping, control_mapping)):
# Insert the two values in the mapped array
if dev_power > 0:
mapped_values[idx][0] = device_values[dev_power - 1][
dev_input_power
]
if dev_control > 0:
mapped_values[idx][1] = device_values[dev_control - 1][
dev_input_control
]
return mapped_values
def _mapped_rw_values(
self,
set_values: List[any],
default_values: List[any],
value_mapping: List[any],
):
if MappingKeys.CONTROL in list(self._mapping_dict.keys()):
mapped_values = []
for _ in range(self._num_devices):
mapped_values.append(default_values)
mapped_values = numpy.array(mapped_values)
elif MappingKeys.FPGA in list(self._mapping_dict.keys()):
mapped_values = numpy.array(default_values)
if len(value_mapping) == 1:
mapping = value_mapping[0]
for idx, (dev_conn, dev_input) in enumerate(mapping):
# Differentiate operations between RECVs and FPGAs
if dev_conn > 0 and MappingKeys.CONTROL in list(
self._mapping_dict.keys()
):
mapped_values[dev_conn - 1, dev_input] = set_values[idx]
elif dev_input > -1 and MappingKeys.FPGA in list(
self._mapping_dict.keys()
):
mapped_values[dev_conn] = set_values[idx]
if len(value_mapping) == 2:
[power_mapping, control_mapping] = value_mapping
for idx, (
(dev_power, dev_input_power),
(dev_control, dev_input_control),
) in enumerate(zip(power_mapping, control_mapping)):
if dev_power > 0:
mapped_values[dev_power - 1, dev_input_power] = set_values[idx][0]
if dev_control > 0:
mapped_values[dev_control - 1, dev_input_control] = set_values[idx][
1
]
return mapped_values
def _init_reshape_out(self, mapped_attributes: List[str], device_type: str):
"""Initialise the _reshape_attributes_out dictionary, setting the
original dimension for each mapped attribute"""
device_class_map = {"RECV": RECVH, "SDP": SDP}
device_class = device_class_map[device_type]
reshape_out = {}
for attr in mapped_attributes:
# Retrieve the original attribute dimension from class attributes
attr_instance = getattr(device_class, attr)
dim_x, dim_y = attr_instance.dim_x, attr_instance.dim_y
# Insert the dimension(s) in the map
reshape_out[attr] = (dim_x,) if dim_y == 0 else (dim_y, dim_x)
return reshape_out
class AntennaToSdpMapper(AntennaMapper):
"""Class that sets a mapping between SDP and AntennaField attributes"""
_VALUE_MAP_NONE_16 = numpy.full(N_pn, None)
def __init__(self, mapping_dict, mapped_dimensions) -> None:
super().__init__(mapping_dict)
self._fpga_mapping = self._mapping_dict[MappingKeys.FPGA]
number_of_antennas = len(self._fpga_mapping)
value_map_fpga_info_int = numpy.zeros(number_of_antennas, dtype=numpy.uint32)
self._mapped_attributes = list(mapped_dimensions)
self._value_mapper = {}
for attr in self._mapped_attributes:
self._value_mapper[attr] = [self._fpga_mapping]
self._default_value_mapping_read = {}
for attr in self._mapped_attributes:
self._default_value_mapping_read[attr] = value_map_fpga_info_int
self._masked_value_mapping_write = {}
for attr in self._mapped_attributes:
# Only read/write attributes
if attr.endswith("RW"):
self._masked_value_mapping_write[
attr
] = AntennaToSdpMapper._VALUE_MAP_NONE_16
class AntennaToRecvMapper(AntennaMapper):
"""Class that sets a mapping between RECVs and AntennaField attributes"""
_VALUE_MAP_NONE_96 = numpy.full(MAX_ANTENNA, None)
_VALUE_MAP_NONE_96_32 = numpy.full((MAX_ANTENNA, N_elements * N_pol), None)
_VALUE_MAP_NONE_96_2 = numpy.full((MAX_ANTENNA, 2), None)
def __init__(self, mapping_dict, mapped_dimensions, number_of_receivers):
super().__init__(mapping_dict, number_of_receivers)
self._power_mapping = self._mapping_dict[MappingKeys.POWER]
self._control_mapping = self._mapping_dict[MappingKeys.CONTROL]
number_of_antennas = len(self._control_mapping)
# Reduce memory footprint of default values by creating single instance of
# common fields
value_map_ant_32_int = numpy.zeros(
[number_of_antennas, N_elements * N_pol], dtype=numpy.int64
)
value_map_ant_32_bool = numpy.full(
(number_of_antennas, N_elements * N_pol), False
)
value_map_ant_bool = numpy.full(number_of_antennas, False)
self._mapped_attributes = list(mapped_dimensions)
self._value_mapper = self._init_value_mapper()
self._default_value_mapping_read = {
"ANT_mask_RW": value_map_ant_bool,
"HBAT_BF_delay_steps_R": value_map_ant_32_int,
"HBAT_BF_delay_steps_RW": value_map_ant_32_int,
"HBAT_LED_on_R": value_map_ant_32_bool,
"HBAT_LED_on_RW": value_map_ant_32_bool,
"HBAT_PWR_LNA_on_R": value_map_ant_32_bool,
"HBAT_PWR_LNA_on_RW": value_map_ant_32_bool,
"HBAT_PWR_on_R": value_map_ant_32_bool,
"HBAT_PWR_on_RW": value_map_ant_32_bool,
"RCU_PWR_ANT_on_R": value_map_ant_bool,
"RCU_PWR_ANT_on_RW": value_map_ant_bool,
"RCU_band_select_R": numpy.zeros(number_of_antennas, dtype=numpy.int64),
"RCU_band_select_RW": numpy.zeros(number_of_antennas, dtype=numpy.int64),
"RCU_attenuator_dB_R": numpy.zeros(
(number_of_antennas, N_pol), dtype=numpy.int64
),
"RCU_attenuator_dB_RW": numpy.zeros(
(number_of_antennas, N_pol), dtype=numpy.int64
),
"RCU_DTH_freq_R": numpy.zeros(number_of_antennas, dtype=numpy.float64),
"RCU_DTH_freq_RW": numpy.zeros(number_of_antennas, dtype=numpy.float64),
"RCU_DTH_on_R": value_map_ant_bool,
"RCU_DTH_PWR_R": numpy.zeros(number_of_antennas, dtype=numpy.float64),
"RCU_DTH_PWR_RW": numpy.zeros(number_of_antennas, dtype=numpy.float64),
"RCU_DAB_filter_on_R": value_map_ant_bool,
"RCU_DAB_filter_on_RW": value_map_ant_bool,
"RCU_PCB_ID_R": numpy.zeros((number_of_antennas, 2), dtype=numpy.int64),
"RCU_PCB_version_R": numpy.full((number_of_antennas, 2), "", dtype=str),
}
self._masked_value_mapping_write = self._init_masked_value_write(
mapped_dimensions
)
# Dict of reshaped mapped attribute dimensions with special cases
self._reshape_attributes_in = mapped_dimensions
self._reshape_attributes_in["RCU_attenuator_dB_R"] = ((MAX_ANTENNA * N_pol),)
self._reshape_attributes_in["RCU_attenuator_dB_RW"] = ((MAX_ANTENNA * N_pol),)
self._reshape_attributes_in["RCU_PCB_ID_R"] = (MAX_ANTENNA,)
self._reshape_attributes_in["RCU_PCB_version_R"] = (MAX_ANTENNA,)
# Dict of reshaped attribute dimensions following its device
self._reshape_attributes_out = self._init_reshape_out(
self._mapped_attributes,
"RECV",
)
# Attributes which need to be reshaped with a copy of their values,
# because RECV original dimension < AntennaField mapped dimension
self._fill_attributes_in = {
"RCU_attenuator_dB_R": N_pol,
"RCU_attenuator_dB_RW": N_pol,
"RCU_PCB_ID_R": N_rcu_inp,
"RCU_PCB_version_R": N_rcu_inp,
}
# Attributes which need to be reshaped with removing a part of their duplicated
# values because RECV original dimension < Antennafield mapped dimension
self._empty_attributes_out = {
"RCU_attenuator_dB_RW": (N_rcu * N_rcu_inp, N_pol)
}
def _init_masked_value_write(self, mapped_dimensions: dict):
"""Create the masked value mapping dictionary for write attributes"""
special_cases = ["RCU_PCB_ID_R", "RCU_PCB_version_R"]
masked_write = {}
for attr, dim in mapped_dimensions.items():
if attr.casefold().endswith("rw") or attr in special_cases:
if len(dim) == 1 and dim[0] == 96:
masked_write[attr] = AntennaToRecvMapper._VALUE_MAP_NONE_96
elif len(dim) == 2 and dim[1] == 2:
masked_write[attr] = AntennaToRecvMapper._VALUE_MAP_NONE_96_2
elif len(dim) == 2 and dim[1] == 32:
masked_write[attr] = AntennaToRecvMapper._VALUE_MAP_NONE_96_32
return masked_write
def _init_value_mapper(self):
"""Create the value mapping dictionary"""
value_mapper = {}
double_mapping = [
"RCU_attenuator_dB_R",
"RCU_attenuator_dB_RW",
"RCU_PCB_ID_R",
"RCU_PCB_version_R",
]
pwr_mapping = ["RCU_PWR_ANT_on_R", "RCU_PWR_ANT_on_RW"]
for attr in self._mapped_attributes:
if attr in double_mapping:
value_mapper[attr] = [self._power_mapping, self._control_mapping]
elif attr in pwr_mapping:
value_mapper[attr] = [self._power_mapping]
else:
value_mapper[attr] = [self._control_mapping]
return value_mapper
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment