Skip to content
Snippets Groups Projects
Commit c6ba1821 authored by Hannes Feldt's avatar Hannes Feldt
Browse files

Merge branch 'L2SS-1095-attr_must_update_pwr_not_ctrl' into 'master'

L2SS-1095: The AntennaField.RCU_PWR_ANT_on_R(W) attributes must update the RCU inputs...

Closes L2SS-1095

See merge request !495
parents dbd31d50 bd9d9906
No related branches found
No related tags found
1 merge request!495L2SS-1095: The AntennaField.RCU_PWR_ANT_on_R(W) attributes must update the RCU inputs...
root = true
[*]
charset = utf-8
end_of_line = lf
indent_size = 4
indent_style = space
insert_final_newline = true
trim_trailing_whitespace = true
max_line_length = 120
tab_width = 4
ij_continuation_indent_size = 8
ij_formatter_off_tag = @formatter:off
ij_formatter_on_tag = @formatter:on
ij_formatter_tags_enabled = false
ij_smart_tabs = false
ij_visual_guides = none
ij_wrap_on_typing = false
[{*.bash,*.sh,*.zsh}]
indent_size = 2
tab_width = 2
ij_shell_binary_ops_start_line = false
ij_shell_keep_column_alignment_padding = false
ij_shell_minify_program = false
ij_shell_redirect_followed_by_space = false
ij_shell_switch_cases_indented = false
ij_shell_use_unix_line_separator = true
[{*.har,*.jsb2,*.jsb3,*.json,.babelrc,.eslintrc,.stylelintrc,bowerrc,jest.config}]
indent_size = 2
ij_json_array_wrapping = split_into_lines
ij_json_keep_blank_lines_in_code = 0
ij_json_keep_indents_on_empty_lines = false
ij_json_keep_line_breaks = true
ij_json_keep_trailing_comma = false
ij_json_object_wrapping = split_into_lines
ij_json_property_alignment = do_not_align
ij_json_space_after_colon = true
ij_json_space_after_comma = true
ij_json_space_before_colon = false
ij_json_space_before_comma = false
ij_json_spaces_within_braces = false
ij_json_spaces_within_brackets = false
ij_json_wrap_long_lines = false
[{*.markdown,*.md}]
ij_markdown_force_one_space_after_blockquote_symbol = true
ij_markdown_force_one_space_after_header_symbol = true
ij_markdown_force_one_space_after_list_bullet = true
ij_markdown_force_one_space_between_words = true
ij_markdown_insert_quote_arrows_on_wrap = true
ij_markdown_keep_indents_on_empty_lines = false
ij_markdown_keep_line_breaks_inside_text_blocks = true
ij_markdown_max_lines_around_block_elements = 1
ij_markdown_max_lines_around_header = 1
ij_markdown_max_lines_between_paragraphs = 1
ij_markdown_min_lines_around_block_elements = 1
ij_markdown_min_lines_around_header = 1
ij_markdown_min_lines_between_paragraphs = 1
ij_markdown_wrap_text_if_long = true
ij_markdown_wrap_text_inside_blockquotes = true
[{*.py,*.pyw}]
max_line_length = 88
ij_python_align_collections_and_comprehensions = true
ij_python_align_multiline_imports = true
ij_python_align_multiline_parameters = true
ij_python_align_multiline_parameters_in_calls = false
ij_python_blank_line_at_file_end = true
ij_python_blank_lines_after_imports = 1
ij_python_blank_lines_after_local_imports = 0
ij_python_blank_lines_around_class = 1
ij_python_blank_lines_around_method = 1
ij_python_blank_lines_around_top_level_classes_functions = 2
ij_python_blank_lines_before_first_method = 0
ij_python_call_parameters_new_line_after_left_paren = true
ij_python_call_parameters_right_paren_on_new_line = true
ij_python_call_parameters_wrap = normal
ij_python_dict_alignment = 2
ij_python_dict_new_line_after_left_brace = true
ij_python_dict_new_line_before_right_brace = true
ij_python_dict_wrapping = 1
ij_python_from_import_new_line_after_left_parenthesis = true
ij_python_from_import_new_line_before_right_parenthesis = true
ij_python_from_import_parentheses_force_if_multiline = true
ij_python_from_import_trailing_comma_if_multiline = false
ij_python_from_import_wrapping = 1
ij_python_hang_closing_brackets = false
ij_python_keep_blank_lines_in_code = 1
ij_python_keep_blank_lines_in_declarations = 1
ij_python_keep_indents_on_empty_lines = false
ij_python_keep_line_breaks = true
ij_python_method_parameters_new_line_after_left_paren = true
ij_python_method_parameters_right_paren_on_new_line = true
ij_python_method_parameters_wrap = normal
ij_python_new_line_after_colon = true
ij_python_new_line_after_colon_multi_clause = true
ij_python_optimize_imports_always_split_from_imports = false
ij_python_optimize_imports_case_insensitive_order = false
ij_python_optimize_imports_join_from_imports_with_same_source = false
ij_python_optimize_imports_sort_by_type_first = true
ij_python_optimize_imports_sort_imports = true
ij_python_optimize_imports_sort_names_in_from_imports = false
ij_python_space_after_comma = true
ij_python_space_after_number_sign = true
ij_python_space_after_py_colon = true
ij_python_space_before_backslash = true
ij_python_space_before_comma = false
ij_python_space_before_for_semicolon = false
ij_python_space_before_lbracket = false
ij_python_space_before_method_call_parentheses = false
ij_python_space_before_method_parentheses = false
ij_python_space_before_number_sign = true
ij_python_space_before_py_colon = false
ij_python_space_within_empty_method_call_parentheses = false
ij_python_space_within_empty_method_parentheses = false
ij_python_spaces_around_additive_operators = true
ij_python_spaces_around_assignment_operators = true
ij_python_spaces_around_bitwise_operators = true
ij_python_spaces_around_eq_in_keyword_argument = false
ij_python_spaces_around_eq_in_named_parameter = false
ij_python_spaces_around_equality_operators = true
ij_python_spaces_around_multiplicative_operators = true
ij_python_spaces_around_power_operator = false
ij_python_spaces_around_relational_operators = true
ij_python_spaces_around_shift_operators = true
ij_python_spaces_within_braces = false
ij_python_spaces_within_brackets = false
ij_python_spaces_within_method_call_parentheses = false
ij_python_spaces_within_method_parentheses = false
ij_python_use_continuation_indent_for_arguments = true
ij_python_use_continuation_indent_for_collection_and_comprehensions = false
ij_python_use_continuation_indent_for_parameters = true
ij_python_wrap_long_lines = false
[{*.yaml,*.yml}]
indent_size = 2
ij_yaml_align_values_properties = do_not_align
ij_yaml_autoinsert_sequence_marker = true
ij_yaml_block_mapping_on_new_line = false
ij_yaml_indent_sequence_value = true
ij_yaml_keep_indents_on_empty_lines = false
ij_yaml_keep_line_breaks = true
ij_yaml_sequence_on_new_line = false
ij_yaml_space_before_colon = false
ij_yaml_spaces_within_braces = true
ij_yaml_spaces_within_brackets = true
# -*- coding: utf-8 -*-
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
# Copyright (C) 2022 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
""" AntennaField Device Server for LOFAR2.0
"""
import logging
from enum import IntEnum
from math import pi
import numpy
from typing import List
import numpy
# PyTango imports
from tango import DeviceProxy, DevSource, AttrWriteType, DevVarFloatArray, DevVarLongArray, DebugIt
from tango import (
DeviceProxy, DevSource, AttrWriteType, DevVarFloatArray,
DevVarLongArray, DebugIt
)
from tango.server import device_property, attribute, command
# Additional import
......@@ -23,24 +25,32 @@ from tangostationcontrol.beam.hba_tile import HBATAntennaOffsets
from tangostationcontrol.common.cables import cable_types
from tangostationcontrol.common.calibration import delay_compensation
from tangostationcontrol.common.calibration import loss_compensation
from tangostationcontrol.common.constants import N_elements, MAX_ANTENNA, N_pol, N_xyz, N_latlong, N_rcu, N_rcu_inp, N_pn, S_pn, N_subbands, VALUES_PER_COMPLEX
from tangostationcontrol.common.constants import (
N_elements, MAX_ANTENNA, N_pol, N_xyz,
N_latlong, N_rcu, N_rcu_inp, N_pn, S_pn, N_subbands, VALUES_PER_COMPLEX
)
from tangostationcontrol.common.entrypoint import entry
from tangostationcontrol.common.lofar_logging import device_logging_to_python, log_exceptions
from tangostationcontrol.common.lofar_logging import (
device_logging_to_python,
log_exceptions
)
from tangostationcontrol.common.states import DEFAULT_COMMAND_STATES
from tangostationcontrol.common.type_checking import type_not_sequence
from tangostationcontrol.devices.device_decorators import fault_on_error, only_in_states
from tangostationcontrol.devices.lofar_device import LOFARDevice
from tangostationcontrol.devices.sdp.common import subband_frequency, real_imag_to_weights
from tangostationcontrol.devices.sdp.common import (
subband_frequency,
real_imag_to_weights
)
from tangostationcontrol.devices.sdp.sdp import SDP
import logging
logger = logging.getLogger()
__all__ = ["AntennaField", "AntennaToRecvMapper", "main"]
class AntennaUse(IntEnum):
AUTO = 0 # use antenna only if its OK or SUSPICIOUS
AUTO = 0 # use antenna only if it's OK or SUSPICIOUS
ON = 1 # force antenna to be on, regardless of quality
OFF = 2 # force antenna to be off, regardless of quality
......@@ -53,7 +63,10 @@ class AntennaQuality(IntEnum):
class MappedAttribute(attribute):
def __init__(self, mapping_attribute, dtype, max_dim_x, max_dim_y=0, access=AttrWriteType.READ, **kwargs):
def __init__(
self, mapping_attribute, dtype, max_dim_x, max_dim_y=0,
access=AttrWriteType.READ, **kwargs
):
if access == AttrWriteType.READ_WRITE:
@fault_on_error()
......@@ -61,7 +74,9 @@ class MappedAttribute(attribute):
cast_type = dtype
while not type_not_sequence(cast_type):
cast_type = cast_type[0]
write_func = device.set_mapped_attribute(mapping_attribute, value, cast_type)
write_func = device.set_mapped_attribute(
mapping_attribute, value, cast_type
)
self.fset = write_func_wrapper
......@@ -71,8 +86,10 @@ class MappedAttribute(attribute):
self.fget = read_func_wrapper
super().__init__(dtype=dtype, max_dim_y=max_dim_y, max_dim_x=max_dim_x, access=access,
fisallowed="is_attribute_access_allowed", **kwargs)
super().__init__(
dtype=dtype, max_dim_y=max_dim_y, max_dim_x=max_dim_x, access=access,
fisallowed="is_attribute_access_allowed", **kwargs
)
@device_logging_to_python()
......@@ -141,7 +158,9 @@ class AntennaField(LOFARDevice):
)
Antenna_Cables = device_property(
doc=f"Which cables connect each antenna to the RCU. Both polarisations are assumed to be connected using the same type of cable. Needs to be any of ({', '.join(cable_types.keys())}).",
doc=f"Which cables connect each antenna to the RCU. Both polarisations are "
f"assumed to be connected using the same type of cable. Needs to be "
f"any of ({', '.join(cable_types.keys())}).",
dtype='DevVarStringArray',
mandatory=False,
default_value=numpy.array(["0m"] * MAX_ANTENNA)
......@@ -155,25 +174,37 @@ class AntennaField(LOFARDevice):
)
Calibration_SDP_Subband_Weights_50MHz = device_property(
doc=f"Measured calibration values for the sdp.FPGA_subband_weights_RW columns of each polarisation of each antenna, at 50 MHz. Each polarisation is represented by a (real, imag) pair for every subband.",
doc=f"Measured calibration values for the sdp.FPGA_subband_weights_RW "
f"columns of each polarisation of each antenna, at 50 MHz. Each "
f"polarisation is represented by a (real, imag) pair for every "
f"subband.",
dtype='DevVarFloatArray',
mandatory=False
)
Calibration_SDP_Subband_Weights_150MHz = device_property(
doc=f"Measured calibration values for the sdp.FPGA_subband_weights_RW columns of each polarisation of each antenna, at 150 MHz. Each polarisation is represented by a (real, imag) pair for every subband.",
doc=f"Measured calibration values for the sdp.FPGA_subband_weights_RW "
f"columns of each polarisation of each antenna, at 150 MHz. Each "
f"polarisation is represented by a (real, imag) pair for every "
f"subband.",
dtype='DevVarFloatArray',
mandatory=False
)
Calibration_SDP_Subband_Weights_200MHz = device_property(
doc=f"Measured calibration values for the sdp.FPGA_subband_weights_RW columns of each polarisation of each antenna, at 200 MHz. Each polarisation is represented by a (real, imag) pair for every subband.",
doc=f"Measured calibration values for the sdp.FPGA_subband_weights_RW "
f"columns of each polarisation of each antenna, at 200 MHz. Each "
f"polarisation is represented by a (real, imag) pair for every "
f"subband.",
dtype='DevVarFloatArray',
mandatory=False
)
Calibration_SDP_Subband_Weights_250MHz = device_property(
doc=f"Measured calibration values for the sdp.FPGA_subband_weights_RW columns of each polarisation of each antenna, at 250 MHz. Each polarisation is represented by a (real, imag) pair for every subband.",
doc=f"Measured calibration values for the sdp.FPGA_subband_weights_RW "
f"columns of each polarisation of each antenna, at 250 MHz. Each "
f"polarisation is represented by a (real, imag) pair for every "
f"subband.",
dtype='DevVarFloatArray',
mandatory=False
)
......@@ -181,7 +212,8 @@ class AntennaField(LOFARDevice):
# ----- Position information
Antenna_Field_Reference_ITRF = device_property(
doc="ITRF position (XYZ) of each antenna field (leave empty to auto-derive from ETRS)",
doc="ITRF position (XYZ) of each antenna field (leave empty to auto-derive "
"from ETRS)",
dtype='DevVarFloatArray',
mandatory=False
)
......@@ -193,7 +225,8 @@ class AntennaField(LOFARDevice):
)
Antenna_Reference_ITRF = device_property(
doc="ITRF position (XYZ) of each Antenna (leave empty to auto-derive from ETRS)",
doc="ITRF position (XYZ) of each Antenna (leave empty to auto-derive from "
"ETRS)",
dtype='DevVarFloatArray',
mandatory=False
)
......@@ -205,14 +238,16 @@ class AntennaField(LOFARDevice):
)
ITRF_Reference_Frame = device_property(
doc="Reference frame in which the ITRF coordinates are provided, or are to be computed from ETRS89",
doc="Reference frame in which the ITRF coordinates are provided, or are to "
"be computed from ETRS89",
dtype='DevString',
mandatory=False,
default_value="ITRF2005"
)
ITRF_Reference_Epoch = device_property(
doc="Reference epoch in which the ITRF coordinates are provided, or are to be extrapolated from ETRS89",
doc="Reference epoch in which the ITRF coordinates are provided, or are to "
"be extrapolated from ETRS89",
dtype='DevFloat',
mandatory=False,
default_value=2015.5
......@@ -226,17 +261,21 @@ class AntennaField(LOFARDevice):
)
PQR_to_ETRS_rotation_matrix = device_property(
doc="Field-specific rotation matrix to convert PQR offsets to ETRS/ITRF offsets.",
doc="Field-specific rotation matrix to convert PQR offsets to ETRS/ITRF "
"offsets.",
dtype='DevVarFloatArray',
mandatory=False,
default_value=numpy.array([ # PQR->ETRS rotation matrix for the core stations
default_value=numpy.array(
[ # PQR->ETRS rotation matrix for the core stations
[-0.1195951054, -0.7919544517, 0.5987530018],
[0.9928227484, -0.0954186800, 0.0720990002],
[0.0000330969, 0.6030782884, 0.7976820024]]).flatten()
[0.0000330969, 0.6030782884, 0.7976820024]]
).flatten()
)
HBAT_base_antenna_offsets = device_property(
doc="Offsets of the antennas in a HBAT, with respect to its reference center (16x3).",
doc="Offsets of the antennas in a HBAT, with respect to its reference "
"center (16x3).",
dtype='DevVarFloatArray',
mandatory=False,
default_value=HBATAntennaOffsets.HBAT1_BASE_ANTENNA_OFFSETS.flatten()
......@@ -246,7 +285,10 @@ class AntennaField(LOFARDevice):
Antenna_to_SDP_Mapping = device_property(
dtype=(numpy.int32,),
doc='The mapping of Antennas to FPGA input pairs. Each FPGA can handle 6 inputs, and SDP has 16 FPGAs. Each antenna is represented with a (fpga, input) value pair. The array is flattened, so must be reshaped upon use. An input=-1 means the antenna is unconnected.',
doc='The mapping of Antennas to FPGA input pairs. Each FPGA can handle 6 '
'inputs, and SDP has 16 FPGAs. Each antenna is represented with a '
'(fpga, input) value pair. The array is flattened, so must be reshaped '
'upon use. An input=-1 means the antenna is unconnected.',
default_value=numpy.array([-1] * MAX_ANTENNA * 2, dtype=numpy.int32)
)
......@@ -261,72 +303,133 @@ class AntennaField(LOFARDevice):
Power_to_RECV_mapping = device_property(
dtype=(numpy.int32,),
doc='The mapping of Antenna power lines to RECV mapping. Each RECV can handle 96 inputs. The Antenna number is the index and the value shows to which receiver device it is connected and on which input. The first integer is the input. The second integer is the RECV id. Example: [0, 3] = first receiver of property RECV_devices with input 3. -1 means that the Antenna is not connected. The property is stored in a one dimensional structure. It needs to be reshaped to a list of lists of two items.',
doc='The mapping of Antenna power lines to RECV mapping. Each RECV can '
'handle 96 inputs. The Antenna number is the index and the value shows '
'to which receiver device it is connected and on which input. The '
'first integer is the input. The second integer is the RECV id. '
'Example: [0, 3] = first receiver of property RECV_devices with '
'input 3. -1 means that the Antenna is not connected. The property is '
'stored in a one dimensional structure. It needs to be reshaped to a '
'list of lists of two items.',
mandatory=False,
default_value=[-1] * MAX_ANTENNA * 2
)
Control_to_RECV_mapping = device_property(
dtype=(numpy.int32,),
doc='The mapping of Antenna control lines to RECV mapping. Each RECV can handle 96 inputs. The Antenna number is the index and the value shows to which receiver device it is connected and on which input. The first integer is the input. The second interger is the RECV id. Example: [1, 3] = STAT/RECV/1 with input 3. -1 means that the Antenna is not connected. The property is stored in a one dimensional structure. It needs to be reshaped to a list of lists of two items.',
doc='The mapping of Antenna control lines to RECV mapping. Each RECV can '
'handle 96 inputs. The Antenna number is the index and the value shows '
'to which receiver device it is connected and on which input. The '
'first integer is the input. The second interger is the RECV id. '
'Example: [1, 3] = STAT/RECV/1 with input 3. -1 means that the Antenna '
'is not connected. The property is stored in a one dimensional '
'structure. It needs to be reshaped to a list of lists of two items.',
mandatory=False,
default_value=[-1] * MAX_ANTENNA * 2
)
RECV_devices = device_property(
dtype=(str,),
doc='Which Recv devices are in use by the AntennaField. The order is important and it should match up with the order of the mapping.',
doc='Which Recv devices are in use by the AntennaField. The order is '
'important and it should match up with the order of the mapping.',
mandatory=False,
default_value=[]
)
# ----- Generic information
Antenna_Type_R = attribute(doc='The type of antenna in this field (LBA or HBA).',
dtype=str)
Antenna_Names_R = attribute(access=AttrWriteType.READ,
dtype=(str,), max_dim_x=MAX_ANTENNA)
Antenna_to_SDP_Mapping_R = attribute(doc='To which (fpga, input) pair each antenna is connected. -1=unconnected.',
dtype=((numpy.int32,),), max_dim_x=N_pol, max_dim_y=MAX_ANTENNA)
Antenna_Type_R = attribute(
doc='The type of antenna in this field (LBA or HBA).',
dtype=str
)
Antenna_Names_R = attribute(
access=AttrWriteType.READ,
dtype=(str,), max_dim_x=MAX_ANTENNA
)
Antenna_to_SDP_Mapping_R = attribute(
doc='To which (fpga, input) pair each antenna is connected. '
'-1=unconnected.',
dtype=((numpy.int32,),), max_dim_x=N_pol, max_dim_y=MAX_ANTENNA
)
# ----- Cable information (between antenna and RCU)
Antenna_Cables_R = attribute(doc=f"Which cables connect each antenna to the RCU. Both polarisations are assumed to be connected using the same type of cable. Needs to be any of ({', '.join(cable_types.keys())}).",
dtype=(str,), max_dim_x=MAX_ANTENNA)
Antenna_Cables_Delay_R = attribute(doc=f"Delay caused by the cable between antenna and RCU, in seconds.",
dtype=(numpy.float64,), max_dim_x=MAX_ANTENNA, unit="s")
Antenna_Cables_Loss_R = attribute(doc=f"Loss caused by the cable between antenna and RCU, in dB.",
dtype=(numpy.float64,), max_dim_x=MAX_ANTENNA, unit="dB")
Antenna_Cables_R = attribute(
doc=f"Which cables connect each antenna to the RCU. Both polarisations are "
f"assumed to be connected using the same type of cable. Needs to be "
f"any of ({', '.join(cable_types.keys())}).",
dtype=(str,), max_dim_x=MAX_ANTENNA
)
Antenna_Cables_Delay_R = attribute(
doc=f"Delay caused by the cable between antenna and RCU, in seconds.",
dtype=(numpy.float64,), max_dim_x=MAX_ANTENNA, unit="s"
)
Antenna_Cables_Loss_R = attribute(
doc=f"Loss caused by the cable between antenna and RCU, in dB.",
dtype=(numpy.float64,), max_dim_x=MAX_ANTENNA, unit="dB"
)
# ----- Calibration information
Calibration_SDP_Signal_Input_Samples_Delay_R = attribute(doc=f"Number of samples that each antenna signal should be delayed to line up. To be applied on sdp.FPGA_signal_input_samples_delay_RW.",
dtype=(numpy.uint32,), max_dim_x=MAX_ANTENNA, unit="samples")
Calibration_RCU_Attenuation_dB_R = attribute(doc=f"Amount of dB with which each antenna signal must be adjusted to line up. To be applied on recv.RCU_attenuator_dB_RW.",
dtype=(numpy.uint32,), max_dim_x=MAX_ANTENNA, unit="dB")
Calibration_SDP_Signal_Input_Samples_Delay_R = attribute(
doc=f"Number of samples that each antenna signal should be delayed to line "
f"up. To be applied on sdp.FPGA_signal_input_samples_delay_RW.",
dtype=(numpy.uint32,), max_dim_x=MAX_ANTENNA, unit="samples"
)
Calibration_RCU_Attenuation_dB_R = attribute(
doc=f"Amount of dB with which each antenna signal must be adjusted to line "
f"up. To be applied on recv.RCU_attenuator_dB_RW.",
dtype=(numpy.uint32,), max_dim_x=MAX_ANTENNA, unit="dB"
)
Calibration_SDP_Fine_Calibration_Default_R = attribute(
doc=f"Computed calibration values for the fine calibration of each antenna. Each antenna is represented by a (delay, phase_offset, amplitude_scaling) triplet.",
dtype=((numpy.float64,),), max_dim_y=MAX_ANTENNA * N_pol, max_dim_x=3)
doc=f"Computed calibration values for the fine calibration of each "
f"antenna. Each antenna is represented by a (delay, phase_offset, "
f"amplitude_scaling) triplet.",
dtype=((numpy.float64,),), max_dim_y=MAX_ANTENNA * N_pol, max_dim_x=3
)
Calibration_SDP_Subband_Weights_Default_R = attribute(
doc=f"Calibration values for the rows in sdp.FPGA_subband_weights_RW relevant for our antennas, as computed. Each subband of each polarisation of each antenna is represented by a real_imag number (real, imag).",
dtype=((numpy.float64,),), max_dim_y=MAX_ANTENNA * N_pol, max_dim_x=N_subbands * VALUES_PER_COMPLEX)
doc=f"Calibration values for the rows in sdp.FPGA_subband_weights_RW "
f"relevant for our antennas, as computed. Each subband of each "
f"polarisation of each antenna is represented by a real_imag number "
f"(real, imag).",
dtype=((numpy.float64,),), max_dim_y=MAX_ANTENNA * N_pol,
max_dim_x=N_subbands * VALUES_PER_COMPLEX
)
Calibration_SDP_Subband_Weights_R = attribute(
doc=f"Calibration values for the rows in sdp.FPGA_subband_weights_RW relevant for our antennas. Each subband of each polarisation of each antenna is represented by a real_imag number (real, imag). Returns the measured values from Calibration_SDP_Subband_Weights_XXXMHz if available, and values computed from Calibration_SDP_Fine_Calibration_Default_R otherwise.",
dtype=((numpy.float64,),), max_dim_y=MAX_ANTENNA * N_pol, max_dim_x=N_subbands * VALUES_PER_COMPLEX)
doc=f"Calibration values for the rows in sdp.FPGA_subband_weights_RW "
f"relevant for our antennas. Each subband of each polarisation of "
f"each antenna is represented by a real_imag number (real, imag). "
f"Returns the measured values from "
f"Calibration_SDP_Subband_Weights_XXXMHz if available, and values "
f"computed from Calibration_SDP_Fine_Calibration_Default_R otherwise.",
dtype=((numpy.float64,),), max_dim_y=MAX_ANTENNA * N_pol,
max_dim_x=N_subbands * VALUES_PER_COMPLEX
)
# ----- Quality and usage information
Antenna_Quality_R = attribute(doc='The quality of each antenna. 0=OK, 1=SUSPICIOUS, 2=BROKEN, 3=BEYOND_REPAIR.',
dtype=(numpy.uint32,), max_dim_x=MAX_ANTENNA)
Antenna_Quality_R = attribute(
doc='The quality of each antenna. '
'0=OK, 1=SUSPICIOUS, 2=BROKEN, 3=BEYOND_REPAIR.',
dtype=(numpy.uint32,), max_dim_x=MAX_ANTENNA
)
Antenna_Use_R = attribute(
doc='Whether each antenna should be used. 0=AUTO, 1=ON, 2=OFF. In AUTO mode, the antenna is used if it is not BROKEN or BEYOND_REPAIR.',
dtype=(numpy.uint32,), max_dim_x=MAX_ANTENNA)
Antenna_Quality_str_R = attribute(doc='The quality of each antenna, as a string.',
dtype=(str,), max_dim_x=MAX_ANTENNA)
Antenna_Use_str_R = attribute(doc='Whether each antenna should be used, as a string.',
dtype=(str,), max_dim_x=MAX_ANTENNA)
Antenna_Usage_Mask_R = attribute(doc='Whether each antenna will be used.',
dtype=(bool,), max_dim_x=MAX_ANTENNA)
doc='Whether each antenna should be used. 0=AUTO, 1=ON, 2=OFF. In AUTO '
'mode, the antenna is used if it is not BROKEN or BEYOND_REPAIR.',
dtype=(numpy.uint32,), max_dim_x=MAX_ANTENNA
)
Antenna_Quality_str_R = attribute(
doc='The quality of each antenna, as a string.',
dtype=(str,), max_dim_x=MAX_ANTENNA
)
Antenna_Use_str_R = attribute(
doc='Whether each antenna should be used, as a string.',
dtype=(str,), max_dim_x=MAX_ANTENNA
)
Antenna_Usage_Mask_R = attribute(
doc='Whether each antenna will be used.',
dtype=(bool,), max_dim_x=MAX_ANTENNA
)
# ----- Attributes mapped on RECV
......@@ -386,38 +489,55 @@ class AntennaField(LOFARDevice):
# ----- Position information
Antenna_Field_Reference_ITRF_R = attribute(access=AttrWriteType.READ,
Antenna_Field_Reference_ITRF_R = attribute(
access=AttrWriteType.READ,
doc='Absolute reference position of antenna field, in ITRF (XYZ)',
dtype=(numpy.float64,), max_dim_x=N_xyz)
dtype=(numpy.float64,), max_dim_x=N_xyz
)
Antenna_Field_Reference_GEO_R = attribute(access=AttrWriteType.READ,
doc='Absolute reference position of antenna field, in latitude/longitude (degrees)',
dtype=(numpy.float64,), max_dim_x=N_latlong)
Antenna_Field_Reference_GEO_R = attribute(
access=AttrWriteType.READ,
doc='Absolute reference position of antenna field, '
'in latitude/longitude (degrees)',
dtype=(numpy.float64,), max_dim_x=N_latlong
)
Antenna_Field_Reference_GEOHASH_R = attribute(access=AttrWriteType.READ,
Antenna_Field_Reference_GEOHASH_R = attribute(
access=AttrWriteType.READ,
doc='Absolute reference position of antenna field, as a geohash string',
dtype=str)
dtype=str
)
HBAT_antenna_ITRF_offsets_R = attribute(access=AttrWriteType.READ,
doc='For each tile, the offsets of the antennas within that, in ITRF ("iHBADeltas"). True shape: nrtiles x 16 x 3.',
HBAT_antenna_ITRF_offsets_R = attribute(
access=AttrWriteType.READ,
doc='For each tile, the offsets of the antennas within that, '
'in ITRF ("iHBADeltas"). True shape: nrtiles x 16 x 3.',
dtype=((numpy.float64,),), max_dim_x=MAX_ANTENNA * N_xyz,
max_dim_y=MAX_ANTENNA)
max_dim_y=MAX_ANTENNA
)
Antenna_Reference_ITRF_R = attribute(access=AttrWriteType.READ,
Antenna_Reference_ITRF_R = attribute(
access=AttrWriteType.READ,
doc='Absolute reference position of each tile, in ITRF (XYZ)',
dtype=((numpy.float64,),), max_dim_x=N_xyz, max_dim_y=MAX_ANTENNA)
dtype=((numpy.float64,),), max_dim_x=N_xyz, max_dim_y=MAX_ANTENNA
)
Antenna_Reference_GEO_R = attribute(access=AttrWriteType.READ,
doc='Absolute reference position of each tile, in latitude/longitude (degrees)',
dtype=((numpy.float64,),), max_dim_x=N_latlong, max_dim_y=MAX_ANTENNA)
Antenna_Reference_GEO_R = attribute(
access=AttrWriteType.READ,
doc='Absolute reference position of each tile, '
'in latitude/longitude (degrees)',
dtype=((numpy.float64,),), max_dim_x=N_latlong, max_dim_y=MAX_ANTENNA
)
Antenna_Reference_GEOHASH_R = attribute(access=AttrWriteType.READ,
Antenna_Reference_GEOHASH_R = attribute(
access=AttrWriteType.READ,
doc='Absolute reference position of each tile, as geohash strings',
dtype=(str,), max_dim_x=MAX_ANTENNA, )
nr_antennas_R = attribute(
doc='Number of Antennas in this field',
dtype=numpy.int32)
dtype=numpy.int32
)
def read_Antenna_Type_R(self):
return self.Antenna_Type
......@@ -429,19 +549,26 @@ class AntennaField(LOFARDevice):
return self.Antenna_Cables
def read_Antenna_Cables_Delay_R(self):
return numpy.array([cable_types[antenna].delay for antenna in self.Antenna_Cables])
return numpy.array(
[cable_types[antenna].delay for antenna in self.Antenna_Cables]
)
def read_Antenna_Cables_Loss_R(self):
rcu_bands = self.read_attribute("RCU_band_select_RW")
control_to_recv_mapping = numpy.array(self.Control_to_RECV_mapping).reshape(-1,2)
control_to_recv_mapping = numpy.array(self.Control_to_RECV_mapping).reshape(
-1, 2
)
recvs = control_to_recv_mapping[:, 0] # first column is RECV device number
# Unconnected antennas return RCU band 0, which does not exist. Return 0 loss for them instead.
return numpy.array([cable_types[cable].get_loss(self.Antenna_Type, rcu_band)
# Unconnected antennas return RCU band 0, which does not exist.
# Return 0 loss for them instead.
return numpy.array(
[cable_types[cable].get_loss(self.Antenna_Type, rcu_band)
if recv > 0 else 0
for recv, cable, rcu_band in
zip(recvs, self.Antenna_Cables, rcu_bands)])
zip(recvs, self.Antenna_Cables, rcu_bands)]
)
def read_Calibration_SDP_Signal_Input_Samples_Delay_R(self):
# Correct for signal delays in the cables
......@@ -456,43 +583,65 @@ class AntennaField(LOFARDevice):
def read_Calibration_SDP_Fine_Calibration_Default_R(self):
def repeat_per_pol(arr):
# repeat values twice, and restore the shape (with the inner dimension being twice the size now)
return numpy.dstack((arr,arr)).reshape(arr.shape[0] * N_pol, *arr.shape[1:])
# repeat values twice, and restore the shape (with the inner dimension
# being twice the size now)
return numpy.dstack((arr, arr)).reshape(
arr.shape[0] * N_pol, *arr.shape[1:]
)
# ----- Delay
# correct for signal delays in the cables (equal for both polarisations)
signal_delay_seconds = repeat_per_pol(self.read_attribute("Antenna_Cables_Delay_R"))
signal_delay_seconds = repeat_per_pol(
self.read_attribute("Antenna_Cables_Delay_R")
)
# compute the required compensation
clock = self.sdp_proxy.clock_RW
_, input_delay_subsample_seconds = delay_compensation(signal_delay_seconds, clock)
_, input_delay_subsample_seconds = delay_compensation(
signal_delay_seconds, clock
)
# ----- Phase offsets
# we don't have any
phase_offsets = repeat_per_pol(numpy.zeros((self.read_attribute("nr_antennas_R"),),dtype=numpy.float64))
phase_offsets = repeat_per_pol(
numpy.zeros(
(self.read_attribute("nr_antennas_R"),), dtype=numpy.float64
)
)
# ----- Amplitude
# correct for signal loss in the cables
signal_delay_loss = repeat_per_pol(self.read_attribute("Antenna_Cables_Loss_R") - self.Field_Attenuation)
signal_delay_loss = repeat_per_pol(
self.read_attribute("Antenna_Cables_Loss_R") - self.Field_Attenuation
)
# return fine scaling to apply
_, input_attenuation_remaining_factor = loss_compensation(signal_delay_loss)
# Return as (delay, phase_offset, amplitude) triplet per polarisation
return numpy.stack((input_delay_subsample_seconds, phase_offsets, input_attenuation_remaining_factor), axis=1)
return numpy.stack(
(input_delay_subsample_seconds, phase_offsets,
input_attenuation_remaining_factor), axis=1
)
def read_Calibration_SDP_Subband_Weights_Default_R(self):
delay_phase_amplitude = self.read_attribute("Calibration_SDP_Fine_Calibration_Default_R")
delay_phase_amplitude = self.read_attribute(
"Calibration_SDP_Fine_Calibration_Default_R"
)
clock = self.sdp_proxy.clock_RW
nyquist_zone = self.sdp_proxy.nyquist_zone_R # NB: for all inputs, unmapped, not just ours
nyquist_zone = self.sdp_proxy.nyquist_zone_R # NB: for all inputs, unmapped,
# not just ours
nr_antennas = self.read_attribute("nr_antennas_R")
antenna_to_sdp_mapping = self.read_attribute("Antenna_to_SDP_Mapping_R")
subband_weights = numpy.zeros((nr_antennas, N_pol, N_subbands, VALUES_PER_COMPLEX), dtype=numpy.float64)
subband_weights = numpy.zeros(
(nr_antennas, N_pol, N_subbands, VALUES_PER_COMPLEX),
dtype=numpy.float64
)
# compute real_imag weight for each subband
for antenna_nr in range(nr_antennas):
......@@ -501,12 +650,16 @@ class AntennaField(LOFARDevice):
continue
for pol_nr in range(N_pol):
delay, phase_offset, amplitude = delay_phase_amplitude[antenna_nr * N_pol + pol_nr, :]
delay, phase_offset, amplitude = delay_phase_amplitude[
antenna_nr * N_pol + pol_nr, :]
for subband_nr in range(N_subbands):
frequency = subband_frequency(subband_nr, clock, nyquist_zone[fpga_nr, input_nr])
frequency = subband_frequency(
subband_nr, clock, nyquist_zone[fpga_nr, input_nr]
)
# turn signal backwards to compensate for the provided delay and offset
# turn signal backwards to compensate for the provided delay
# and offset
phase_shift = -(2 * numpy.pi * frequency * delay + phase_offset)
real = numpy.cos(phase_shift) * amplitude
......@@ -514,7 +667,9 @@ class AntennaField(LOFARDevice):
subband_weights[antenna_nr, pol_nr, subband_nr, :] = (real, imag)
return subband_weights.reshape(nr_antennas * N_pol, N_subbands * VALUES_PER_COMPLEX)
return subband_weights.reshape(
nr_antennas * N_pol, N_subbands * VALUES_PER_COMPLEX
)
def _rcu_band_to_calibration_table(self) -> dict:
"""
......@@ -540,7 +695,9 @@ class AntennaField(LOFARDevice):
# reshape them into their actual form
for band, caltable in rcu_band_to_caltable.items():
rcu_band_to_caltable[band] = numpy.array(caltable).reshape(nr_antennas, N_pol, N_subbands, 2)
rcu_band_to_caltable[band] = numpy.array(caltable).reshape(
nr_antennas, N_pol, N_subbands, 2
)
return rcu_band_to_caltable
......@@ -550,7 +707,9 @@ class AntennaField(LOFARDevice):
rcu_band_to_caltable = self._rcu_band_to_calibration_table()
# antenna mapping onto RECV
control_to_recv_mapping = numpy.array(self.Control_to_RECV_mapping).reshape(-1,2)
control_to_recv_mapping = numpy.array(self.Control_to_RECV_mapping).reshape(
-1, 2
)
recvs = control_to_recv_mapping[:, 0] # first column is RECV device number
# antenna mapping onto SDP
......@@ -559,9 +718,13 @@ class AntennaField(LOFARDevice):
# construct the subband weights based on the rcu_band of each antenna,
# combining the relevant tables.
nr_antennas = self.read_attribute("nr_antennas_R")
subband_weights = numpy.zeros((nr_antennas, N_pol, N_subbands, VALUES_PER_COMPLEX), dtype=numpy.float64)
subband_weights = numpy.zeros(
(nr_antennas, N_pol, N_subbands, VALUES_PER_COMPLEX),
dtype=numpy.float64
)
for antenna_nr, rcu_band in enumerate(rcu_bands):
# Skip antennas not connected to RECV. These do not have a valid RCU band selected.
# Skip antennas not connected to RECV. These do not have a valid RCU band
# selected.
if recvs[antenna_nr] == 0:
continue
......@@ -569,17 +732,22 @@ class AntennaField(LOFARDevice):
if antenna_to_sdp_mapping[antenna_nr, 1] == -1:
continue
subband_weights[antenna_nr, :, :, :] = rcu_band_to_caltable[rcu_band][antenna_nr, :, :, :]
subband_weights[antenna_nr, :, :, :] = rcu_band_to_caltable[rcu_band][
antenna_nr, :, :, :]
return subband_weights.reshape(nr_antennas * N_pol, N_subbands * VALUES_PER_COMPLEX)
return subband_weights.reshape(
nr_antennas * N_pol, N_subbands * VALUES_PER_COMPLEX
)
def read_Calibration_RCU_Attenuation_dB_R(self):
# Correct for signal loss in the cables
signal_delay_loss = self.read_attribute("Antenna_Cables_Loss_R") - self.Field_Attenuation
signal_delay_loss = self.read_attribute(
"Antenna_Cables_Loss_R"
) - self.Field_Attenuation
# return coarse attenuation to apply
input_attenuation_integer_dB, _ = loss_compensation(signal_delay_loss)
return input_attenuation_integer_dB
input_attenuation_integer_db, _ = loss_compensation(signal_delay_loss)
return input_attenuation_integer_db
def read_Antenna_Use_R(self):
return self.Antenna_Use
......@@ -598,7 +766,9 @@ class AntennaField(LOFARDevice):
quality = numpy.array(self.Antenna_Quality)
antennas_forced_on = use == AntennaUse.ON
antennas_auto_on = numpy.logical_and(use == AntennaUse.AUTO, quality <= AntennaQuality.SUSPICIOUS)
antennas_auto_on = numpy.logical_and(
use == AntennaUse.AUTO, quality <= AntennaQuality.SUSPICIOUS
)
return numpy.logical_or(antennas_forced_on, antennas_auto_on)
......@@ -622,9 +792,12 @@ class AntennaField(LOFARDevice):
if self.Antenna_Field_Reference_ITRF:
return numpy.array(self.Antenna_Field_Reference_ITRF).reshape(N_xyz)
# calculate them from ETRS coordinates if not, using the configured ITRF reference
ETRS_coordinates = numpy.array(self.Antenna_Field_Reference_ETRS).reshape(N_xyz)
return ETRS_to_ITRF(ETRS_coordinates, self.ITRF_Reference_Frame, self.ITRF_Reference_Epoch)
# calculate them from ETRS coordinates if not, using the configured ITRF
# reference
etrs_coordinates = numpy.array(self.Antenna_Field_Reference_ETRS).reshape(N_xyz)
return ETRS_to_ITRF(
etrs_coordinates, self.ITRF_Reference_Frame, self.ITRF_Reference_Epoch
)
def read_Antenna_Field_Reference_GEO_R(self):
return ITRF_to_GEO(self.read_Antenna_Field_Reference_ITRF_R())
......@@ -633,32 +806,40 @@ class AntennaField(LOFARDevice):
return GEO_to_GEOHASH(self.read_Antenna_Field_Reference_GEO_R())
def read_HBAT_antenna_ITRF_offsets_R(self):
""" Returns the ITRF differences between the center of the tile and its individual elements,
which is a (nrtiles)x16x3 matrix (16 elements with 3 ITRF coordinates each), returned
as a (nrtiles)x48 matrix.
""" Returns the ITRF differences between the center of the tile and its
individual elements, which is a (nrtiles)x16x3 matrix (16 elements with 3
ITRF coordinates each), returned as a (nrtiles)x48 matrix.
This takes the relative offsets between the elements in the tiles as described in
HBAT_base_antenna_offsets. These offsets are in PQR space, which is the plane of the station.
The tiles are rotated locally in this space according to the HBAT_PQR_rotation_angles_deg,
and finally translated into global ETRS coordinates using the PQR_to_ETRS_rotation_matrix.
This takes the relative offsets between the elements in the tiles as
described in HBAT_base_antenna_offsets. These offsets are in PQR space,
which is the plane of the station. The tiles are rotated locally in this
space according to the HBAT_PQR_rotation_angles_deg, and finally translated
into global ETRS coordinates using the PQR_to_ETRS_rotation_matrix.
The relative ITRF offsets are the same as relative ETRS offsets.
NB: In all of this, the absolute position of each tile is actually irrelevant, as all the
tiles lie on the same plane in ITRF. """
NB: In all of this, the absolute position of each tile is actually
irrelevant, as all the tiles lie on the same plane in ITRF. """
# the relative offsets between the elements is fixed in HBAT_base_antenna_offsets
base_antenna_offsets = numpy.array(self.HBAT_base_antenna_offsets).reshape(N_elements, N_xyz)
# the relative offsets between the elements is fixed in
# HBAT_base_antenna_offsets
base_antenna_offsets = numpy.array(self.HBAT_base_antenna_offsets).reshape(
N_elements, N_xyz
)
PQR_to_ETRS_rotation_matrix = numpy.array(self.PQR_to_ETRS_rotation_matrix).reshape(N_xyz, N_xyz)
pqr_to_etrs_rotation_matrix = numpy.array(
self.PQR_to_ETRS_rotation_matrix
).reshape(N_xyz, N_xyz)
# each tile has its own rotation angle, resulting in different offsets per tile
all_offsets = numpy.array(
[HBATAntennaOffsets.ITRF_offsets(
base_antenna_offsets,
angle_deg * pi / 180,
PQR_to_ETRS_rotation_matrix)
for angle_deg in self.HBAT_PQR_rotation_angles_deg])
pqr_to_etrs_rotation_matrix
)
for angle_deg in self.HBAT_PQR_rotation_angles_deg]
)
return all_offsets.reshape(-1, N_elements * N_xyz)
......@@ -667,10 +848,13 @@ class AntennaField(LOFARDevice):
if self.Antenna_Reference_ITRF:
return numpy.array(self.Antenna_Reference_ITRF).reshape(-1, N_xyz)
# calculate them from ETRS coordinates if not, using the configured ITRF reference
ETRS_coordinates = numpy.array(self.Antenna_Reference_ETRS).reshape(-1, N_xyz)
# calculate them from ETRS coordinates if not, using the configured ITRF
# reference
etrs_coordinates = numpy.array(self.Antenna_Reference_ETRS).reshape(-1, N_xyz)
return ETRS_to_ITRF(ETRS_coordinates, self.ITRF_Reference_Frame, self.ITRF_Reference_Epoch)
return ETRS_to_ITRF(
etrs_coordinates, self.ITRF_Reference_Frame, self.ITRF_Reference_Epoch
)
def read_Antenna_Reference_GEO_R(self):
return ITRF_to_GEO(self.read_Antenna_Reference_ITRF_R())
......@@ -695,7 +879,9 @@ class AntennaField(LOFARDevice):
# Reshape of mapping is needed because properties are stored in 1d arrays
control_mapping = numpy.reshape(self.Control_to_RECV_mapping, (-1, 2))
power_mapping = numpy.reshape(self.Power_to_RECV_mapping, (-1, 2))
self.__mapper = AntennaToRecvMapper(control_mapping, power_mapping, number_of_receivers)
self.__mapper = AntennaToRecvMapper(
control_mapping, power_mapping, number_of_receivers
)
def get_mapped_attribute(self, mapped_point: str):
recv_results = []
......@@ -754,10 +940,14 @@ class AntennaField(LOFARDevice):
""" Configure RECV to process our antennas. """
# Disable controlling the tiles that fall outside the mask
# WARN: Needed in configure_for_initialise but Tango does not allow to write attributes in INIT state
self.proxy.write_attribute('ANT_mask_RW', self.read_attribute('Antenna_Usage_Mask_R'))
# WARN: Needed in configure_for_initialise but Tango does not allow to write
# attributes in INIT state
self.proxy.write_attribute(
'ANT_mask_RW', self.read_attribute('Antenna_Usage_Mask_R')
)
# Turn on power to antennas that need it (and due to the ANT_mask, that we're using)
# Turn on power to antennas that need it (and due to the ANT_mask, that we're
# using)
self.proxy.write_attribute('RCU_PWR_ANT_on_RW', self.Antenna_Needs_Power)
@command()
......@@ -789,7 +979,7 @@ class AntennaField(LOFARDevice):
"""
# -----------------------------------------------------------
# Set signal-input attenuations to compensate for
# Set signal-input attenuation to compensate for
# differences in cable length.
# -----------------------------------------------------------
......@@ -813,20 +1003,26 @@ class AntennaField(LOFARDevice):
# -----------------------------------------------------------
# The delay to apply, in samples [antenna]
input_samples_delay = self.read_attribute("Calibration_SDP_Signal_Input_Samples_Delay_R")
input_samples_delay = self.read_attribute(
"Calibration_SDP_Signal_Input_Samples_Delay_R"
)
# read-modify-write on [fpga][(input, polarisation)]
fpga_signal_input_samples_delay = self.sdp_proxy.FPGA_signal_input_samples_delay_RW
fpga_signal_input_samples_delay = \
self.sdp_proxy.FPGA_signal_input_samples_delay_RW
for antenna_nr, (fpga_nr, input_nr) in enumerate(antenna_to_sdp_mapping):
if input_nr == -1:
# skip unconnected antennas
continue
# set for X polarisation
fpga_signal_input_samples_delay[fpga_nr, input_nr * N_pol + 0] = input_samples_delay[antenna_nr]
fpga_signal_input_samples_delay[fpga_nr, input_nr * N_pol + 0] = \
input_samples_delay[antenna_nr]
# set for Y polarisation
fpga_signal_input_samples_delay[fpga_nr, input_nr * N_pol + 1] = input_samples_delay[antenna_nr]
self.sdp_proxy.FPGA_signal_input_samples_delay_RW = fpga_signal_input_samples_delay
fpga_signal_input_samples_delay[fpga_nr, input_nr * N_pol + 1] = \
input_samples_delay[antenna_nr]
self.sdp_proxy.FPGA_signal_input_samples_delay_RW = \
fpga_signal_input_samples_delay
# -----------------------------------------------------------
# Compute calibration of subband weights for the remaining
......@@ -836,22 +1032,27 @@ class AntennaField(LOFARDevice):
# obtain caltable
caltable = self.read_attribute("Calibration_SDP_Subband_Weights_R")
# obtain frequency information
clock = self.sdp_proxy.clock_RW
nyquist_zone = self.sdp_proxy.nyquist_zone_R # NB: for all inputs, unmapped, not just ours
# read-modify-write on [fpga][(input, polarisation)]
fpga_subband_weights = self.sdp_proxy.FPGA_subband_weights_RW.reshape(N_pn, S_pn, N_subbands)
fpga_subband_weights = self.sdp_proxy.FPGA_subband_weights_RW.reshape(
N_pn, S_pn, N_subbands
)
for antenna_nr, (fpga_nr, input_nr) in enumerate(antenna_to_sdp_mapping):
if input_nr == -1:
# skip unconnected antennas
continue
# set weights
fpga_subband_weights[fpga_nr, input_nr * N_pol + 0, :] = real_imag_to_weights(caltable[antenna_nr * N_pol + 0, :], SDP.SUBBAND_UNIT_WEIGHT)
fpga_subband_weights[fpga_nr, input_nr * N_pol + 1, :] = real_imag_to_weights(caltable[antenna_nr * N_pol + 1, :], SDP.SUBBAND_UNIT_WEIGHT)
self.sdp_proxy.FPGA_subband_weights_RW = fpga_subband_weights.reshape(N_pn, S_pn * N_subbands)
fpga_subband_weights[fpga_nr, input_nr * N_pol + 0,
:] = real_imag_to_weights(
caltable[antenna_nr * N_pol + 0, :], SDP.SUBBAND_UNIT_WEIGHT
)
fpga_subband_weights[fpga_nr, input_nr * N_pol + 1,
:] = real_imag_to_weights(
caltable[antenna_nr * N_pol + 1, :], SDP.SUBBAND_UNIT_WEIGHT
)
self.sdp_proxy.FPGA_subband_weights_RW = fpga_subband_weights.reshape(
N_pn, S_pn * N_subbands
)
@command(dtype_in=DevVarFloatArray, dtype_out=DevVarLongArray)
def calculate_HBAT_bf_delay_steps(self, delays: numpy.ndarray):
......@@ -872,8 +1073,10 @@ class AntennaField(LOFARDevice):
continue
# convert them into delay steps
flatten_delay_steps = numpy.array(recv_proxy.calculate_HBAT_bf_delay_steps(recv_delays.flatten()),
dtype=numpy.int64)
flatten_delay_steps = numpy.array(
recv_proxy.calculate_HBAT_bf_delay_steps(recv_delays.flatten()),
dtype=numpy.int64
)
delay_steps = numpy.reshape(flatten_delay_steps, (-1, N_elements * N_pol))
# write back into same positions we collected them from
......@@ -887,18 +1090,39 @@ class AntennaToRecvMapper(object):
_VALUE_MAP_NONE_96 = numpy.full(MAX_ANTENNA, None)
_VALUE_MAP_NONE_96_32 = numpy.full((MAX_ANTENNA, N_elements * N_pol), None)
def __init__(self, control_to_recv_mapping, power_to_recv_mapping, number_of_receivers):
def __init__(
self, control_to_recv_mapping, power_to_recv_mapping, number_of_receivers
):
number_of_antennas = len(control_to_recv_mapping)
# Reduce memory footprint of default values by creating single instance of
# common fields
value_map_ant_32_int = numpy.zeros([number_of_antennas, N_elements * N_pol], dtype=numpy.int64)
value_map_ant_32_bool = numpy.full((number_of_antennas, N_elements * N_pol), False)
value_map_ant_32_int = numpy.zeros(
[number_of_antennas, N_elements * N_pol], dtype=numpy.int64
)
value_map_ant_32_bool = numpy.full(
(number_of_antennas, N_elements * N_pol), False
)
value_map_ant_bool = numpy.full(number_of_antennas, False)
self._control_mapping = control_to_recv_mapping
self._power_mapping = power_to_recv_mapping
self._number_of_receivers = number_of_receivers
self._value_mapper = {
"ANT_mask_RW" : self._control_mapping,
"HBAT_BF_delay_steps_R" : self._control_mapping,
"HBAT_BF_delay_steps_RW": self._control_mapping,
"HBAT_LED_on_R" : self._control_mapping,
"HBAT_LED_on_RW" : self._control_mapping,
"HBAT_PWR_LNA_on_R" : self._control_mapping,
"HBAT_PWR_LNA_on_RW" : self._control_mapping,
"HBAT_PWR_on_R" : self._control_mapping,
"HBAT_PWR_on_RW" : self._control_mapping,
"RCU_PWR_ANT_on_R" : self._power_mapping,
"RCU_PWR_ANT_on_RW" : self._power_mapping,
"RCU_band_select_RW" : self._control_mapping,
"RCU_attenuator_dB_RW" : self._control_mapping,
}
self._default_value_mapping_read = {
"ANT_mask_RW" : value_map_ant_bool,
"HBAT_BF_delay_steps_R" : value_map_ant_32_int,
......@@ -911,8 +1135,12 @@ class AntennaToRecvMapper(object):
"HBAT_PWR_on_RW" : value_map_ant_32_bool,
"RCU_PWR_ANT_on_R" : value_map_ant_bool,
"RCU_PWR_ANT_on_RW" : value_map_ant_bool,
"RCU_band_select_RW": numpy.zeros(number_of_antennas, dtype=numpy.int64),
"RCU_attenuator_dB_RW": numpy.zeros(number_of_antennas, dtype=numpy.int64),
"RCU_band_select_RW" : numpy.zeros(
number_of_antennas, dtype=numpy.int64
),
"RCU_attenuator_dB_RW" : numpy.zeros(
number_of_antennas, dtype=numpy.int64
),
}
self._masked_value_mapping_write = {
"ANT_mask_RW" : AntennaToRecvMapper._VALUE_MAP_NONE_96,
......@@ -954,35 +1182,49 @@ class AntennaToRecvMapper(object):
default_values = self._default_value_mapping_read[mapped_attribute]
if mapped_attribute in self._reshape_attributes_in:
recv_results = numpy.reshape(recv_results,
(self._number_of_receivers,) + self._reshape_attributes_in[mapped_attribute])
recv_results = numpy.reshape(
recv_results,
(self._number_of_receivers,) + self._reshape_attributes_in[
mapped_attribute]
)
return self._mapped_r_values(recv_results, default_values)
return self._mapped_r_values(
recv_results, default_values, self._value_mapper[mapped_attribute]
)
def map_write(self, mapped_attribute: str, set_values: List[any]) -> List[any]:
"""Perform a mapped write for the attribute using the set_values
:param mapped_attribute: attribute identifier as present in py:attribute:`~_default_value_mapping_write`
:param mapped_attribute: attribute identifier as present in
py:attribute:`~_default_value_mapping_write`
:param set_values: The values to be set for the specified attribute
:return: set_values as mapped given attribute dimensions and control mapping
"""
default_values = self._masked_value_mapping_write[mapped_attribute]
mapped_values = self._mapped_rw_values(set_values, default_values)
mapped_values = self._mapped_rw_values(
set_values, default_values, self._value_mapper[mapped_attribute]
)
if mapped_attribute in self._reshape_attributes_out:
mapped_values = numpy.reshape(mapped_values,
(self._number_of_receivers,) + self._reshape_attributes_out[mapped_attribute])
mapped_values = numpy.reshape(
mapped_values,
(self._number_of_receivers,) + self._reshape_attributes_out[
mapped_attribute]
)
return mapped_values
def _mapped_r_values(self, recv_results: List[any], default_values: List[any]):
def _mapped_r_values(
self, recv_results: List[any], default_values: List[any],
value_mapping
):
"""Mapping for read using :py:attribute:`~_control_mapping` and shallow copy"""
mapped_values = numpy.array(default_values)
for idx, mapping in enumerate(self._control_mapping):
for idx, mapping in enumerate(value_mapping):
recv = mapping[0]
rcu = mapping[1]
if recv > 0:
......@@ -990,7 +1232,10 @@ class AntennaToRecvMapper(object):
return mapped_values
def _mapped_rw_values(self, set_values: List[any], default_values: List[any]):
def _mapped_rw_values(
self, set_values: List[any], default_values: List[any],
value_mapping
):
"""Mapping for write using :py:attribute:`~_control_mapping` and shallow copy"""
mapped_values = []
......@@ -999,7 +1244,7 @@ class AntennaToRecvMapper(object):
mapped_values.append(default_values)
mapped_values = numpy.array(mapped_values)
for idx, mapping in enumerate(self._control_mapping):
for idx, mapping in enumerate(value_mapping):
recv = mapping[0]
rcu = mapping[1]
if recv > 0:
......
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR 2.0 Station Software
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
# Copyright (C) 2022 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
import logging
import unittest
from unittest import mock
import numpy
from tango.test_context import DeviceTestContext
from tangostationcontrol.common.constants import MAX_ANTENNA, N_rcu, DEFAULT_N_HBA_TILES
from tangostationcontrol.devices import antennafield
from tangostationcontrol.devices.antennafield import AntennaToRecvMapper, AntennaQuality, AntennaUse
from tangostationcontrol.devices.antennafield import (
AntennaToRecvMapper,
AntennaQuality, AntennaUse
)
from tangostationcontrol.test import base
from tangostationcontrol.test.devices import device_base
from tangostationcontrol.common.constants import MAX_ANTENNA, N_rcu, DEFAULT_N_HBA_TILES
logger = logging.getLogger()
......@@ -32,166 +27,263 @@ class TestAntennaToRecvMapper(base.TestCase):
# A mapping where Antennas are all not mapped to control RCUs
CONTROL_NOT_CONNECTED = [[-1, -1]] * DEFAULT_N_HBA_TILES
# A mapping where first two Antennas are mapped on the first Receiver.
# The first Antenna control line on RCU 1 and the second Antenna control line on RCU 0.
CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1 = [[1, 1], [1, 0]] + [[-1, -1]] * (DEFAULT_N_HBA_TILES - 2)
# The first Antenna control line on RCU 1 and the second Antenna control line
# on RCU 0.
POWER_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1 = [[1, 1], [1, 0]] + [[-1, -1]] * (
DEFAULT_N_HBA_TILES - 2)
CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1 = [[1, 1], [1, 0]] + [[-1, -1]] * (
DEFAULT_N_HBA_TILES - 2)
def test_ant_read_mask_r_no_mapping(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3)
mapper = AntennaToRecvMapper(
self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3
)
receiver_values = [[False] * MAX_ANTENNA, [False] * MAX_ANTENNA, [False] * MAX_ANTENNA]
receiver_values = [[False] * MAX_ANTENNA, [False] * MAX_ANTENNA,
[False] * MAX_ANTENNA]
expected = [False] * DEFAULT_N_HBA_TILES
actual = mapper.map_read("ANT_mask_RW", receiver_values)
numpy.testing.assert_equal(expected, actual)
def test_ant_read_mask_r_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 3)
mapper = AntennaToRecvMapper(
self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1,
self.POWER_NOT_CONNECTED, 3
)
receiver_values = [[False, True, False] + [False, False, False] * (N_rcu - 1), [False] * MAX_ANTENNA, [False] * MAX_ANTENNA]
receiver_values = [[False, True, False] + [False, False, False] * (N_rcu - 1),
[False] * MAX_ANTENNA, [False] * MAX_ANTENNA]
expected = [True, False] + [False] * (DEFAULT_N_HBA_TILES - 2)
actual = mapper.map_read("ANT_mask_RW", receiver_values)
numpy.testing.assert_equal(expected, actual)
def test_rcu_band_select_no_mapping(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3)
mapper = AntennaToRecvMapper(
self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3
)
receiver_values = [[0] * MAX_ANTENNA, [0] * MAX_ANTENNA, [0] * MAX_ANTENNA]
expected = [0] * DEFAULT_N_HBA_TILES
actual = mapper.map_read("RCU_band_select_RW", receiver_values)
numpy.testing.assert_equal(expected, actual)
def test_bf_read_delay_steps_r_no_mapping(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3)
mapper = AntennaToRecvMapper(
self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3
)
receiver_values = [[[0] * N_rcu] * MAX_ANTENNA, [[0] * N_rcu] * MAX_ANTENNA, [[0] * N_rcu] * MAX_ANTENNA]
receiver_values = [[[0] * N_rcu] * MAX_ANTENNA, [[0] * N_rcu] * MAX_ANTENNA,
[[0] * N_rcu] * MAX_ANTENNA]
expected = [[0] * N_rcu] * DEFAULT_N_HBA_TILES
actual = mapper.map_read("HBAT_BF_delay_steps_R", receiver_values)
numpy.testing.assert_equal(expected, actual)
def test_bf_read_delay_steps_r_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 3)
mapper = AntennaToRecvMapper(
self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1,
self.POWER_NOT_CONNECTED, 3
)
receiver_values = [[[2] * N_rcu, [1] * N_rcu] + [[0] * N_rcu] * (MAX_ANTENNA - 2), [[0] * N_rcu] * MAX_ANTENNA, [[0] * N_rcu] * MAX_ANTENNA]
expected = [[1] * N_rcu, [2] * N_rcu] + [[0] * N_rcu] * (DEFAULT_N_HBA_TILES - 2)
receiver_values = [
[[2] * N_rcu, [1] * N_rcu] + [[0] * N_rcu] * (MAX_ANTENNA - 2),
[[0] * N_rcu] * MAX_ANTENNA, [[0] * N_rcu] * MAX_ANTENNA]
expected = [[1] * N_rcu, [2] * N_rcu] + [[0] * N_rcu] * (
DEFAULT_N_HBA_TILES - 2)
actual = mapper.map_read("HBAT_BF_delay_steps_R", receiver_values)
numpy.testing.assert_equal(expected, actual)
def test_bf_read_delay_steps_rw_no_mapping(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3)
mapper = AntennaToRecvMapper(
self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3
)
receiver_values = [[[0] * N_rcu] * MAX_ANTENNA, [[0] * N_rcu] * MAX_ANTENNA, [[0] * N_rcu] * MAX_ANTENNA]
receiver_values = [[[0] * N_rcu] * MAX_ANTENNA, [[0] * N_rcu] * MAX_ANTENNA,
[[0] * N_rcu] * MAX_ANTENNA]
expected = [[0] * N_rcu] * DEFAULT_N_HBA_TILES
actual = mapper.map_read("HBAT_BF_delay_steps_RW", receiver_values)
numpy.testing.assert_equal(expected, actual)
def test_bf_read_delay_steps_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 3)
mapper = AntennaToRecvMapper(
self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1,
self.POWER_NOT_CONNECTED, 3
)
receiver_values = [[[2] * N_rcu, [1] * N_rcu] + [[0] * N_rcu] * (MAX_ANTENNA - 2), [[0] * N_rcu] * MAX_ANTENNA, [[0] * N_rcu] * MAX_ANTENNA]
expected = [[1] * N_rcu, [2] * N_rcu] + [[0] * N_rcu] * (DEFAULT_N_HBA_TILES - 2)
receiver_values = [
[[2] * N_rcu, [1] * N_rcu] + [[0] * N_rcu] * (MAX_ANTENNA - 2),
[[0] * N_rcu] * MAX_ANTENNA, [[0] * N_rcu] * MAX_ANTENNA]
expected = [[1] * N_rcu, [2] * N_rcu] + [[0] * N_rcu] * (
DEFAULT_N_HBA_TILES - 2)
actual = mapper.map_read("HBAT_BF_delay_steps_RW", receiver_values)
numpy.testing.assert_equal(expected, actual)
def test_map_read_led_on_r_unmapped(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3)
mapper = AntennaToRecvMapper(
self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3
)
receiver_values = [[[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA]
receiver_values = [[[False] * N_rcu] * MAX_ANTENNA,
[[False] * N_rcu] * MAX_ANTENNA,
[[False] * N_rcu] * MAX_ANTENNA]
expected = [[False] * N_rcu] * DEFAULT_N_HBA_TILES
actual = mapper.map_read("HBAT_LED_on_R", receiver_values)
numpy.testing.assert_equal(expected, actual)
def test_map_read_led_on_r_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 3)
mapper = AntennaToRecvMapper(
self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1,
self.POWER_NOT_CONNECTED, 3
)
receiver_values = [[[False, True] * 16, [True, False] * 16] + [[False] * N_rcu] * (MAX_ANTENNA - 2), [[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA]
receiver_values = [
[[False, True] * 16, [True, False] * 16] + [[False] * N_rcu] * (
MAX_ANTENNA - 2), [[False] * N_rcu] * MAX_ANTENNA,
[[False] * N_rcu] * MAX_ANTENNA]
expected = [[True, False] * 16, [False, True] * 16] + [[False] * N_rcu] * (DEFAULT_N_HBA_TILES - 2)
expected = [[True, False] * 16, [False, True] * 16] + [[False] * N_rcu] * (
DEFAULT_N_HBA_TILES - 2)
actual = mapper.map_read("HBAT_LED_on_R", receiver_values)
numpy.testing.assert_equal(expected, actual)
def test_map_read_led_on_rw_unmapped(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3)
mapper = AntennaToRecvMapper(
self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3
)
receiver_values = [[[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA]
receiver_values = [[[False] * N_rcu] * MAX_ANTENNA,
[[False] * N_rcu] * MAX_ANTENNA,
[[False] * N_rcu] * MAX_ANTENNA]
expected = [[False] * N_rcu] * DEFAULT_N_HBA_TILES
actual = mapper.map_read("HBAT_LED_on_RW", receiver_values)
numpy.testing.assert_equal(expected, actual)
def test_map_read_led_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 3)
mapper = AntennaToRecvMapper(
self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1,
self.POWER_NOT_CONNECTED, 3
)
receiver_values = [[[False, True] * 16, [True, False] * 16] + [[False] * N_rcu] * (MAX_ANTENNA - 2), [[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA]
receiver_values = [
[[False, True] * 16, [True, False] * 16] + [[False] * N_rcu] * (
MAX_ANTENNA - 2), [[False] * N_rcu] * MAX_ANTENNA,
[[False] * N_rcu] * MAX_ANTENNA]
expected = [[True, False] * 16, [False, True] * 16] + [[False] * N_rcu] * (DEFAULT_N_HBA_TILES - 2)
expected = [[True, False] * 16, [False, True] * 16] + [[False] * N_rcu] * (
DEFAULT_N_HBA_TILES - 2)
actual = mapper.map_read("HBAT_LED_on_RW", receiver_values)
numpy.testing.assert_equal(expected, actual)
def test_map_read_pwr_lna_on_r_unmapped(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3)
mapper = AntennaToRecvMapper(
self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3
)
receiver_values = [[[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA]
receiver_values = [[[False] * N_rcu] * MAX_ANTENNA,
[[False] * N_rcu] * MAX_ANTENNA,
[[False] * N_rcu] * MAX_ANTENNA]
expected = [[False] * N_rcu] * DEFAULT_N_HBA_TILES
actual = mapper.map_read("HBAT_PWR_LNA_on_R", receiver_values)
numpy.testing.assert_equal(expected, actual)
def test_map_read_pwr_lna_on_r_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 3)
mapper = AntennaToRecvMapper(
self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1,
self.POWER_NOT_CONNECTED, 3
)
receiver_values = [[[False, True] * 16, [True, False] * 16] + [[False] * N_rcu] * (MAX_ANTENNA - 2), [[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA]
receiver_values = [
[[False, True] * 16, [True, False] * 16] + [[False] * N_rcu] * (
MAX_ANTENNA - 2), [[False] * N_rcu] * MAX_ANTENNA,
[[False] * N_rcu] * MAX_ANTENNA]
expected = [[True, False] * 16, [False, True] * 16] + [[False] * N_rcu] * (DEFAULT_N_HBA_TILES - 2)
expected = [[True, False] * 16, [False, True] * 16] + [[False] * N_rcu] * (
DEFAULT_N_HBA_TILES - 2)
actual = mapper.map_read("HBAT_PWR_LNA_on_R", receiver_values)
numpy.testing.assert_equal(expected, actual)
def test_map_read_pwr_lna_on_rw_unmapped(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3)
mapper = AntennaToRecvMapper(
self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3
)
receiver_values = [[[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA]
receiver_values = [[[False] * N_rcu] * MAX_ANTENNA,
[[False] * N_rcu] * MAX_ANTENNA,
[[False] * N_rcu] * MAX_ANTENNA]
expected = [[False] * N_rcu] * DEFAULT_N_HBA_TILES
actual = mapper.map_read("HBAT_PWR_LNA_on_RW", receiver_values)
numpy.testing.assert_equal(expected, actual)
def test_map_read_pwr_lna_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 3)
mapper = AntennaToRecvMapper(
self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1,
self.POWER_NOT_CONNECTED, 3
)
receiver_values = [[[False, True] * 16, [True, False] * 16] + [[False] * N_rcu] * (MAX_ANTENNA - 2), [[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA]
receiver_values = [
[[False, True] * 16, [True, False] * 16] + [[False] * N_rcu] * (
MAX_ANTENNA - 2), [[False] * N_rcu] * MAX_ANTENNA,
[[False] * N_rcu] * MAX_ANTENNA]
expected = [[True, False] * 16, [False, True] * 16] + [[False] * N_rcu] * (DEFAULT_N_HBA_TILES - 2)
expected = [[True, False] * 16, [False, True] * 16] + [[False] * N_rcu] * (
DEFAULT_N_HBA_TILES - 2)
actual = mapper.map_read("HBAT_PWR_LNA_on_RW", receiver_values)
numpy.testing.assert_equal(expected, actual)
def test_map_read_pwr_on_r_unmapped(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3)
mapper = AntennaToRecvMapper(
self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3
)
receiver_values = [[[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA]
receiver_values = [[[False] * N_rcu] * MAX_ANTENNA,
[[False] * N_rcu] * MAX_ANTENNA,
[[False] * N_rcu] * MAX_ANTENNA]
expected = [[False] * N_rcu] * DEFAULT_N_HBA_TILES
actual = mapper.map_read("HBAT_PWR_on_R", receiver_values)
numpy.testing.assert_equal(expected, actual)
def test_map_read_pwr_on_r_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 3)
mapper = AntennaToRecvMapper(
self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1,
self.POWER_NOT_CONNECTED, 3
)
receiver_values = [[[False, True] * 16, [True, False] * 16] + [[False] * N_rcu] * (MAX_ANTENNA - 2), [[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA]
receiver_values = [
[[False, True] * 16, [True, False] * 16] + [[False] * N_rcu] * (
MAX_ANTENNA - 2), [[False] * N_rcu] * MAX_ANTENNA,
[[False] * N_rcu] * MAX_ANTENNA]
expected = [[True, False] * 16, [False, True] * 16] + [[False] * N_rcu] * (DEFAULT_N_HBA_TILES - 2)
expected = [[True, False] * 16, [False, True] * 16] + [[False] * N_rcu] * (
DEFAULT_N_HBA_TILES - 2)
actual = mapper.map_read("HBAT_PWR_on_R", receiver_values)
numpy.testing.assert_equal(expected, actual)
def test_map_read_pwr_on_rw_unmapped(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3)
mapper = AntennaToRecvMapper(
self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3
)
receiver_values = [[[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA]
receiver_values = [[[False] * N_rcu] * MAX_ANTENNA,
[[False] * N_rcu] * MAX_ANTENNA,
[[False] * N_rcu] * MAX_ANTENNA]
expected = [[False] * N_rcu] * DEFAULT_N_HBA_TILES
actual = mapper.map_read("HBAT_PWR_on_RW", receiver_values)
numpy.testing.assert_equal(expected, actual)
def test_map_read_pwr_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 3)
mapper = AntennaToRecvMapper(
self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1,
self.POWER_NOT_CONNECTED, 3
)
receiver_values = [[[False, True] * 16, [True, False] * 16] + [[False] * N_rcu] * (MAX_ANTENNA - 2), [[False] * N_rcu] * MAX_ANTENNA, [[False] * N_rcu] * MAX_ANTENNA]
receiver_values = [
[[False, True] * 16, [True, False] * 16] + [[False] * N_rcu] * (
MAX_ANTENNA - 2), [[False] * N_rcu] * MAX_ANTENNA,
[[False] * N_rcu] * MAX_ANTENNA]
expected = [[True, False] * 16, [False, True] * 16] + [[False] * N_rcu] * (DEFAULT_N_HBA_TILES - 2)
expected = [[True, False] * 16, [False, True] * 16] + [[False] * N_rcu] * (
DEFAULT_N_HBA_TILES - 2)
actual = mapper.map_read("HBAT_PWR_on_RW", receiver_values)
numpy.testing.assert_equal(expected, actual)
......@@ -200,7 +292,9 @@ class TestAntennaToRecvMapper(base.TestCase):
def test_map_write_ant_mask_rw_no_mapping_and_one_receiver(self):
"""Verify results None without control and array sizes"""
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1)
mapper = AntennaToRecvMapper(
self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1
)
set_values = [None] * DEFAULT_N_HBA_TILES
expected = [[[None, None, None]] * N_rcu]
......@@ -210,7 +304,9 @@ class TestAntennaToRecvMapper(base.TestCase):
def test_map_write_ant_mask_rw_no_mapping_and_two_receivers(self):
"""Verify results None without control and array sizes"""
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2)
mapper = AntennaToRecvMapper(
self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2
)
set_values = [None] * DEFAULT_N_HBA_TILES
expected = [[[None, None, None]] * N_rcu] * 2
......@@ -218,7 +314,10 @@ class TestAntennaToRecvMapper(base.TestCase):
numpy.testing.assert_equal(expected, actual)
def test_map_write_ant_mask_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1)
mapper = AntennaToRecvMapper(
self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1,
self.POWER_NOT_CONNECTED, 1
)
set_values = [True, False] + [None] * (DEFAULT_N_HBA_TILES - 2)
expected = [[[False, True, None]] + [[None, None, None]] * (N_rcu - 1)]
......@@ -226,7 +325,9 @@ class TestAntennaToRecvMapper(base.TestCase):
numpy.testing.assert_equal(expected, actual)
def test_map_write_rcu_pwr_ant_on_no_mapping_and_one_receiver(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1)
mapper = AntennaToRecvMapper(
self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1
)
set_values = [None] * DEFAULT_N_HBA_TILES
expected = [[[None, None, None]] * N_rcu]
......@@ -234,7 +335,9 @@ class TestAntennaToRecvMapper(base.TestCase):
numpy.testing.assert_equal(expected, actual)
def test_map_write_rcu_pwr_ant_on_no_mapping_and_two_receivers(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2)
mapper = AntennaToRecvMapper(
self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2
)
set_values = [None] * DEFAULT_N_HBA_TILES
expected = [[[None, None, None]] * N_rcu] * 2
......@@ -242,7 +345,10 @@ class TestAntennaToRecvMapper(base.TestCase):
numpy.testing.assert_equal(expected, actual)
def test_map_write_rcu_pwr_ant_on_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1)
mapper = AntennaToRecvMapper(
self.CONTROL_NOT_CONNECTED,
self.POWER_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, 1
)
set_values = [1, 0] + [None] * (DEFAULT_N_HBA_TILES - 2)
expected = [[[0, 1, None]] + [[None, None, None]] * (N_rcu - 1)]
......@@ -250,7 +356,9 @@ class TestAntennaToRecvMapper(base.TestCase):
numpy.testing.assert_equal(expected, actual)
def test_map_write_rcu_band_select_no_mapping_and_one_receiver(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1)
mapper = AntennaToRecvMapper(
self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1
)
set_values = [None] * DEFAULT_N_HBA_TILES
expected = [[[None, None, None]] * N_rcu]
......@@ -258,7 +366,9 @@ class TestAntennaToRecvMapper(base.TestCase):
numpy.testing.assert_equal(expected, actual)
def test_map_write_rcu_band_select_no_mapping_and_two_receivers(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2)
mapper = AntennaToRecvMapper(
self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2
)
set_values = [None] * DEFAULT_N_HBA_TILES
expected = [[[None, None, None]] * N_rcu] * 2
......@@ -266,7 +376,10 @@ class TestAntennaToRecvMapper(base.TestCase):
numpy.testing.assert_equal(expected, actual)
def test_map_write_rcu_band_select_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1)
mapper = AntennaToRecvMapper(
self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1,
self.POWER_NOT_CONNECTED, 1
)
set_values = [1, 0] + [None] * (DEFAULT_N_HBA_TILES - 2)
expected = [[[0, 1, None]] + [[None, None, None]] * (N_rcu - 1)]
......@@ -274,7 +387,9 @@ class TestAntennaToRecvMapper(base.TestCase):
numpy.testing.assert_equal(expected, actual)
def test_map_write_bf_delay_steps_rw_no_mapping_and_one_receiver(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1)
mapper = AntennaToRecvMapper(
self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1
)
set_values = [[1] * N_rcu] * DEFAULT_N_HBA_TILES
expected = [[[None] * N_rcu] * MAX_ANTENNA]
......@@ -282,7 +397,9 @@ class TestAntennaToRecvMapper(base.TestCase):
numpy.testing.assert_equal(expected, actual)
def test_map_write_bf_delay_steps_rw_no_mapping_and_two_receivers(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2)
mapper = AntennaToRecvMapper(
self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2
)
set_values = [[1] * N_rcu] * DEFAULT_N_HBA_TILES
expected = [[[None] * N_rcu] * MAX_ANTENNA, [[None] * N_rcu] * MAX_ANTENNA]
......@@ -290,15 +407,21 @@ class TestAntennaToRecvMapper(base.TestCase):
numpy.testing.assert_equal(expected, actual)
def test_map_write_bf_delay_steps_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1)
mapper = AntennaToRecvMapper(
self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1,
self.POWER_NOT_CONNECTED, 1
)
set_values = [[1] * N_rcu, [2] * N_rcu] + [[None] * N_rcu] * (DEFAULT_N_HBA_TILES - 2)
set_values = [[1] * N_rcu, [2] * N_rcu] + [[None] * N_rcu] * (
DEFAULT_N_HBA_TILES - 2)
expected = [[[2] * N_rcu, [1] * N_rcu] + [[None] * N_rcu] * (MAX_ANTENNA - 2)]
actual = mapper.map_write("HBAT_BF_delay_steps_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_led_on_rw_no_mapping_and_one_receiver(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1)
mapper = AntennaToRecvMapper(
self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1
)
set_values = [[None] * N_rcu] * DEFAULT_N_HBA_TILES
expected = [[[None] * N_rcu] * MAX_ANTENNA]
......@@ -306,7 +429,9 @@ class TestAntennaToRecvMapper(base.TestCase):
numpy.testing.assert_equal(expected, actual)
def test_map_write_led_on_rw_no_mapping_and_two_receivers(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2)
mapper = AntennaToRecvMapper(
self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2
)
set_values = [[None] * N_rcu] * DEFAULT_N_HBA_TILES
expected = [[[None] * N_rcu] * MAX_ANTENNA, [[None] * N_rcu] * MAX_ANTENNA]
......@@ -314,15 +439,22 @@ class TestAntennaToRecvMapper(base.TestCase):
numpy.testing.assert_equal(expected, actual)
def test_map_write_led_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1)
mapper = AntennaToRecvMapper(
self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1,
self.POWER_NOT_CONNECTED, 1
)
set_values = [[False, True] * 16, [True, False] * 16] + [[None] * N_rcu] * (DEFAULT_N_HBA_TILES - 2)
expected = [[[True, False] * 16, [False, True] * 16] + [[None] * N_rcu] * (MAX_ANTENNA - 2)]
set_values = [[False, True] * 16, [True, False] * 16] + [[None] * N_rcu] * (
DEFAULT_N_HBA_TILES - 2)
expected = [[[True, False] * 16, [False, True] * 16] + [[None] * N_rcu] * (
MAX_ANTENNA - 2)]
actual = mapper.map_write("HBAT_LED_on_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_pwr_lna_on_rw_no_mapping_and_one_receiver(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1)
mapper = AntennaToRecvMapper(
self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1
)
set_values = [[None] * N_rcu] * DEFAULT_N_HBA_TILES
expected = [[[None] * N_rcu] * MAX_ANTENNA]
......@@ -330,7 +462,9 @@ class TestAntennaToRecvMapper(base.TestCase):
numpy.testing.assert_equal(expected, actual)
def test_map_write_pwr_lna_on_rw_no_mapping_and_two_receivers(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2)
mapper = AntennaToRecvMapper(
self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2
)
set_values = [[None] * N_rcu] * DEFAULT_N_HBA_TILES
expected = [[[None] * N_rcu] * MAX_ANTENNA, [[None] * N_rcu] * MAX_ANTENNA]
......@@ -338,22 +472,31 @@ class TestAntennaToRecvMapper(base.TestCase):
numpy.testing.assert_equal(expected, actual)
def test_map_write_pwr_lna_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1)
mapper = AntennaToRecvMapper(
self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1,
self.POWER_NOT_CONNECTED, 1
)
set_values = [[False, True] * 16, [True, False] * 16] + [[None] * N_rcu] * (DEFAULT_N_HBA_TILES - 2)
expected = [[[True, False] * 16, [False, True] * 16] + [[None] * N_rcu] * (MAX_ANTENNA - 2)]
set_values = [[False, True] * 16, [True, False] * 16] + [[None] * N_rcu] * (
DEFAULT_N_HBA_TILES - 2)
expected = [[[True, False] * 16, [False, True] * 16] + [[None] * N_rcu] * (
MAX_ANTENNA - 2)]
actual = mapper.map_write("HBAT_PWR_LNA_on_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_pwr_on_rw_no_mapping_and_one_receiver(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1)
mapper = AntennaToRecvMapper(
self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1
)
set_values = [[None] * N_rcu] * DEFAULT_N_HBA_TILES
expected = [[[None] * N_rcu] * MAX_ANTENNA]
actual = mapper.map_write("HBAT_PWR_on_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_lna_on_rw_no_mapping_and_two_receivers(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2)
mapper = AntennaToRecvMapper(
self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2
)
set_values = [[None] * N_rcu] * DEFAULT_N_HBA_TILES
expected = [[[None] * N_rcu] * MAX_ANTENNA, [[None] * N_rcu] * MAX_ANTENNA]
......@@ -361,10 +504,15 @@ class TestAntennaToRecvMapper(base.TestCase):
numpy.testing.assert_equal(expected, actual)
def test_map_write_pwr_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1)
mapper = AntennaToRecvMapper(
self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1,
self.POWER_NOT_CONNECTED, 1
)
set_values = [[False, True] * 16, [True, False] * 16] + [[None] * N_rcu] * (DEFAULT_N_HBA_TILES - 2)
expected = [[[True, False] * 16, [False, True] * 16] + [[None] * N_rcu] * (MAX_ANTENNA - 2)]
set_values = [[False, True] * 16, [True, False] * 16] + [[None] * N_rcu] * (
DEFAULT_N_HBA_TILES - 2)
expected = [[[True, False] * 16, [False, True] * 16] + [[None] * N_rcu] * (
MAX_ANTENNA - 2)]
actual = mapper.map_write("HBAT_PWR_on_RW", set_values)
numpy.testing.assert_equal(expected, actual)
......@@ -387,46 +535,83 @@ class TestAntennafieldDevice(device_base.DeviceTestCase):
def test_read_Antenna_Field_Reference(self):
"""Verify if Antenna coordinates are correctly provided"""
# Device uses ITRF coordinates by default
with DeviceTestContext(antennafield.AntennaField, properties=self.AT_PROPERTIES, process=True) as proxy:
with DeviceTestContext(
antennafield.AntennaField, properties=self.AT_PROPERTIES, process=True
) as proxy:
self.assertEqual(3.0, proxy.Antenna_Field_Reference_ITRF_R[0])
# Device derives coordinates from ETRS if ITRF ones are not found
at_properties_v2 = {'OPC_Server_Name': 'example.com', 'OPC_Server_Port': 4840, 'OPC_Time_Out': 5.0, 'Antenna_Field_Reference_ETRS' : [7.0, 7.0, 7.0]}
with DeviceTestContext(antennafield.AntennaField, properties=at_properties_v2, process=True) as proxy:
self.assertNotEqual(3.0, proxy.Antenna_Field_Reference_ITRF_R[0]) # value = 6.948998835785814
at_properties_v2 = {
'OPC_Server_Name': 'example.com', 'OPC_Server_Port': 4840,
'OPC_Time_Out' : 5.0, 'Antenna_Field_Reference_ETRS': [7.0, 7.0, 7.0]
}
with DeviceTestContext(
antennafield.AntennaField, properties=at_properties_v2, process=True
) as proxy:
self.assertNotEqual(
3.0, proxy.Antenna_Field_Reference_ITRF_R[0]
) # value = 6.948998835785814
def test_read_Antenna_Quality(self):
""" Verify if Antenna_Quality_R is correctly retrieved """
antenna_qualities = numpy.array([AntennaQuality.OK] * MAX_ANTENNA)
with DeviceTestContext(antennafield.AntennaField, properties=self.AT_PROPERTIES, process=True) as proxy:
with DeviceTestContext(
antennafield.AntennaField, properties=self.AT_PROPERTIES, process=True
) as proxy:
numpy.testing.assert_equal(antenna_qualities, proxy.Antenna_Quality_R)
def test_read_Antenna_Use(self):
""" Verify if Antenna_Use_R is correctly retrieved """
antenna_use = numpy.array([AntennaUse.AUTO] * MAX_ANTENNA)
with DeviceTestContext(antennafield.AntennaField, properties=self.AT_PROPERTIES, process=True) as proxy:
with DeviceTestContext(
antennafield.AntennaField, properties=self.AT_PROPERTIES, process=True
) as proxy:
numpy.testing.assert_equal(antenna_use, proxy.Antenna_Use_R)
def test_read_Antenna_Usage_Mask(self):
""" Verify if Antenna_Usage_Mask_R is correctly retrieved """
antenna_qualities = numpy.array([AntennaQuality.OK] * MAX_ANTENNA)
antenna_use = numpy.array([AntennaUse.ON] + [AntennaUse.AUTO] * (MAX_ANTENNA - 1))
antenna_properties = {'Antenna_Quality': antenna_qualities, 'Antenna_Use': antenna_use}
with DeviceTestContext(antennafield.AntennaField, properties={**self.AT_PROPERTIES, **antenna_properties}, process=True) as proxy:
numpy.testing.assert_equal(numpy.array([True] * MAX_ANTENNA), proxy.Antenna_Usage_Mask_R)
antenna_use = numpy.array(
[AntennaUse.ON] + [AntennaUse.AUTO] * (MAX_ANTENNA - 1)
)
antenna_properties = {
'Antenna_Quality': antenna_qualities, 'Antenna_Use': antenna_use
}
with DeviceTestContext(
antennafield.AntennaField,
properties={**self.AT_PROPERTIES, **antenna_properties}, process=True
) as proxy:
numpy.testing.assert_equal(
numpy.array([True] * MAX_ANTENNA), proxy.Antenna_Usage_Mask_R
)
def test_read_Antenna_Usage_Mask_only_one_functioning_antenna(self):
""" Verify if Antenna_Usage_Mask_R (only first antenna is OK) is correctly retrieved """
antenna_qualities = numpy.array([AntennaQuality.OK] + [AntennaQuality.BROKEN] * (MAX_ANTENNA - 1))
antenna_use = numpy.array([AntennaUse.ON] + [AntennaUse.AUTO] * (MAX_ANTENNA - 1))
antenna_properties = {'Antenna_Quality': antenna_qualities, 'Antenna_Use': antenna_use}
with DeviceTestContext(antennafield.AntennaField, properties={**self.AT_PROPERTIES, **antenna_properties}, process=True) as proxy:
numpy.testing.assert_equal(numpy.array([True] + [False] * (MAX_ANTENNA - 1)), proxy.Antenna_Usage_Mask_R)
antenna_qualities = numpy.array(
[AntennaQuality.OK] + [AntennaQuality.BROKEN] * (MAX_ANTENNA - 1)
)
antenna_use = numpy.array(
[AntennaUse.ON] + [AntennaUse.AUTO] * (MAX_ANTENNA - 1)
)
antenna_properties = {
'Antenna_Quality': antenna_qualities, 'Antenna_Use': antenna_use
}
with DeviceTestContext(
antennafield.AntennaField,
properties={**self.AT_PROPERTIES, **antenna_properties}, process=True
) as proxy:
numpy.testing.assert_equal(
numpy.array([True] + [False] * (MAX_ANTENNA - 1)),
proxy.Antenna_Usage_Mask_R
)
def test_read_Antenna_Names(self):
""" Verify if Antenna_Names_R is correctly retrieved """
antenna_names = ["C0", "C1", "C2", "C3", "C4"]
antenna_properties = {'Antenna_Names': antenna_names}
with DeviceTestContext(antennafield.AntennaField, properties={**self.AT_PROPERTIES, **antenna_properties}, process=True) as proxy:
with DeviceTestContext(
antennafield.AntennaField,
properties={**self.AT_PROPERTIES, **antenna_properties}, process=True
) as proxy:
for i in range(len(antenna_names)):
self.assertTrue(proxy.Antenna_Names_R[i] == f"C{i}")
......@@ -453,7 +638,10 @@ class TestAntennafieldDevice(device_base.DeviceTestCase):
) as proxy:
proxy.boot()
proxy.write_attribute("HBAT_PWR_on_RW", numpy.array([[False] * N_rcu] * DEFAULT_N_HBA_TILES))
proxy.write_attribute(
"HBAT_PWR_on_RW",
numpy.array([[False] * N_rcu] * DEFAULT_N_HBA_TILES)
)
numpy.testing.assert_equal(
m_proxy.return_value.write_attribute.call_args[0][1],
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment