Skip to content
Snippets Groups Projects
Commit 45983926 authored by Stefano Di Frischia's avatar Stefano Di Frischia
Browse files

Merge branch 'master' into L2SS-1008-beamlet-source-addresses

parents b1268754 6870bb0a
Branches
Tags
1 merge request!460Resolve L2SS-1008 "Beamlet source addresses"
......@@ -25,6 +25,7 @@ from tangostationcontrol.beam.geo import ETRS_to_ITRF, ITRF_to_GEO, GEO_to_GEOHA
from tangostationcontrol.beam.hba_tile import HBATAntennaOffsets, NUMBER_OF_ELEMENTS_PER_TILE
import logging
logger = logging.getLogger()
__all__ = ["AntennaField", "AntennaToRecvMapper", "main"]
......@@ -32,17 +33,20 @@ __all__ = ["AntennaField", "AntennaToRecvMapper", "main"]
# Highest number of HBA tiles we support per AntennaField
MAX_NUMBER_OF_HBAT = 96
class AntennaUse(IntEnum):
AUTO = 0 # use antenna only if its OK or SUSPICIOUS
ON = 1 # force antenna to be on, regardless of quality
OFF = 2 # force antenna to be off, regardless of quality
class AntennaQuality(IntEnum):
OK = 0
SUSPICIOUS = 1
BROKEN = 2
BEYOND_REPAIR = 3
class mapped_attribute(attribute):
def __init__(self, mapping_attribute, dtype, max_dim_x, max_dim_y=0, access=AttrWriteType.READ, **kwargs):
......@@ -62,12 +66,12 @@ class mapped_attribute(attribute):
self.fget = read_func_wrapper
super().__init__(dtype=dtype, max_dim_y=max_dim_y, max_dim_x=max_dim_x, access=access, fisallowed="is_attribute_access_allowed", **kwargs)
super().__init__(dtype=dtype, max_dim_y=max_dim_y, max_dim_x=max_dim_x, access=access,
fisallowed="is_attribute_access_allowed", **kwargs)
@device_logging_to_python()
class AntennaField(lofar_device):
""" Manages the antennas in a single antenna field, by acting as a
a mapping onto one or more RECV devices.
......@@ -229,7 +233,8 @@ class AntennaField(lofar_device):
dtype=(str,), max_dim_x=MAX_NUMBER_OF_HBAT)
Antenna_Quality_R = attribute(doc='The quality of each antenna. 0=OK, 1=SUSPICIOUS, 2=BROKEN, 3=BEYOND_REPAIR.',
dtype=(numpy.uint32,), max_dim_x=MAX_NUMBER_OF_HBAT)
Antenna_Use_R = attribute(doc='Whether each antenna should be used. 0=AUTO, 1=ON, 2=OFF. In AUTO mode, the antenna is used if it is not BROKEN or BEYOND_REPAIR.',
Antenna_Use_R = attribute(
doc='Whether each antenna should be used. 0=AUTO, 1=ON, 2=OFF. In AUTO mode, the antenna is used if it is not BROKEN or BEYOND_REPAIR.',
dtype=(numpy.uint32,), max_dim_x=MAX_NUMBER_OF_HBAT)
Antenna_Quality_str_R = attribute(doc='The quality of each antenna, as a string.',
dtype=(str,), max_dim_x=MAX_NUMBER_OF_HBAT)
......@@ -242,18 +247,31 @@ class AntennaField(lofar_device):
Antenna_to_SDP_Mapping_R = attribute(doc='To which (fpga, input) pair each antenna is connected. -1=unconnected.',
dtype=((numpy.int32,),), max_dim_x=2, max_dim_y=MAX_NUMBER_OF_HBAT)
ANT_mask_RW = mapped_attribute("ANT_mask_RW", dtype=(bool,), max_dim_x=MAX_NUMBER_OF_HBAT, access=AttrWriteType.READ_WRITE)
ANT_mask_RW = mapped_attribute("ANT_mask_RW", dtype=(bool,), max_dim_x=MAX_NUMBER_OF_HBAT,
access=AttrWriteType.READ_WRITE)
RCU_PWR_ANT_on_R = mapped_attribute("RCU_PWR_ANT_on_R", dtype=(bool,), max_dim_x=MAX_NUMBER_OF_HBAT)
RCU_PWR_ANT_on_RW = mapped_attribute("RCU_PWR_ANT_on_RW", dtype=(bool,), max_dim_x=MAX_NUMBER_OF_HBAT, access=AttrWriteType.READ_WRITE)
HBAT_BF_delay_steps_R = mapped_attribute("HBAT_BF_delay_steps_R", dtype=((numpy.int64,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2, max_dim_y=MAX_NUMBER_OF_HBAT)
HBAT_BF_delay_steps_RW = mapped_attribute("HBAT_BF_delay_steps_RW", dtype=((numpy.int64,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2, max_dim_y=MAX_NUMBER_OF_HBAT, access=AttrWriteType.READ_WRITE)
HBAT_LED_on_R = mapped_attribute("HBAT_LED_on_R", dtype=((bool,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2, max_dim_y=MAX_NUMBER_OF_HBAT)
HBAT_LED_on_RW = mapped_attribute("HBAT_LED_on_RW", dtype=((bool,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2, max_dim_y=MAX_NUMBER_OF_HBAT, access=AttrWriteType.READ_WRITE)
HBAT_PWR_LNA_on_R = mapped_attribute("HBAT_PWR_LNA_on_R", dtype=((bool,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2, max_dim_y=MAX_NUMBER_OF_HBAT)
HBAT_PWR_LNA_on_RW = mapped_attribute("HBAT_PWR_LNA_on_RW", dtype=((bool,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2, max_dim_y=MAX_NUMBER_OF_HBAT, access=AttrWriteType.READ_WRITE)
HBAT_PWR_on_R = mapped_attribute("HBAT_PWR_on_R", dtype=((bool,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2, max_dim_y=MAX_NUMBER_OF_HBAT)
HBAT_PWR_on_RW = mapped_attribute("HBAT_PWR_on_RW", dtype=((bool,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2, max_dim_y=MAX_NUMBER_OF_HBAT, access=AttrWriteType.READ_WRITE)
RCU_band_select_RW = mapped_attribute("RCU_band_select_RW", dtype=(numpy.int64,), max_dim_x=MAX_NUMBER_OF_HBAT, access=AttrWriteType.READ_WRITE)
RCU_PWR_ANT_on_RW = mapped_attribute("RCU_PWR_ANT_on_RW", dtype=(bool,), max_dim_x=MAX_NUMBER_OF_HBAT,
access=AttrWriteType.READ_WRITE)
HBAT_BF_delay_steps_R = mapped_attribute("HBAT_BF_delay_steps_R", dtype=((numpy.int64,),),
max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2, max_dim_y=MAX_NUMBER_OF_HBAT)
HBAT_BF_delay_steps_RW = mapped_attribute("HBAT_BF_delay_steps_RW", dtype=((numpy.int64,),),
max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2, max_dim_y=MAX_NUMBER_OF_HBAT,
access=AttrWriteType.READ_WRITE)
HBAT_LED_on_R = mapped_attribute("HBAT_LED_on_R", dtype=((bool,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2,
max_dim_y=MAX_NUMBER_OF_HBAT)
HBAT_LED_on_RW = mapped_attribute("HBAT_LED_on_RW", dtype=((bool,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2,
max_dim_y=MAX_NUMBER_OF_HBAT, access=AttrWriteType.READ_WRITE)
HBAT_PWR_LNA_on_R = mapped_attribute("HBAT_PWR_LNA_on_R", dtype=((bool,),),
max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2, max_dim_y=MAX_NUMBER_OF_HBAT)
HBAT_PWR_LNA_on_RW = mapped_attribute("HBAT_PWR_LNA_on_RW", dtype=((bool,),),
max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2, max_dim_y=MAX_NUMBER_OF_HBAT,
access=AttrWriteType.READ_WRITE)
HBAT_PWR_on_R = mapped_attribute("HBAT_PWR_on_R", dtype=((bool,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2,
max_dim_y=MAX_NUMBER_OF_HBAT)
HBAT_PWR_on_RW = mapped_attribute("HBAT_PWR_on_RW", dtype=((bool,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2,
max_dim_y=MAX_NUMBER_OF_HBAT, access=AttrWriteType.READ_WRITE)
RCU_band_select_RW = mapped_attribute("RCU_band_select_RW", dtype=(numpy.int64,), max_dim_x=MAX_NUMBER_OF_HBAT,
access=AttrWriteType.READ_WRITE)
# ----- Position information
......@@ -271,7 +289,8 @@ class AntennaField(lofar_device):
HBAT_antenna_ITRF_offsets_R = attribute(access=AttrWriteType.READ,
doc='For each tile, the offsets of the antennas within that, in ITRF ("iHBADeltas"). True shape: nrtiles x 16 x 3.',
dtype=((numpy.float64,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 3, max_dim_y=96)
dtype=((numpy.float64,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 3,
max_dim_y=96)
Antenna_Reference_ITRF_R = attribute(access=AttrWriteType.READ,
doc='Absolute reference position of each tile, in ITRF (XYZ)',
......@@ -480,7 +499,8 @@ class AntennaField(lofar_device):
continue
# convert them into delay steps
flatten_delay_steps = numpy.array(recv_proxy.calculate_HBAT_bf_delay_steps(recv_delays.flatten()), dtype=numpy.int64)
flatten_delay_steps = numpy.array(recv_proxy.calculate_HBAT_bf_delay_steps(recv_delays.flatten()),
dtype=numpy.int64)
delay_steps = numpy.reshape(flatten_delay_steps, (-1, NUMBER_OF_ELEMENTS_PER_TILE * 2))
# write back into same positions we collected them from
......@@ -490,7 +510,6 @@ class AntennaField(lofar_device):
class AntennaToRecvMapper(object):
_VALUE_MAP_NONE_96 = numpy.full(96, None)
_VALUE_MAP_NONE_96_32 = numpy.full((96, 32), None)
......@@ -508,8 +527,6 @@ class AntennaToRecvMapper(object):
self._number_of_receivers = number_of_receivers
self._default_value_mapping_read = {
"ANT_mask_RW": value_map_ant_bool,
"RCU_PWR_ANT_on_R": value_map_ant_bool,
"RCU_PWR_ANT_on_RW": value_map_ant_bool,
"HBAT_BF_delay_steps_R": value_map_ant_32_int,
"HBAT_BF_delay_steps_RW": value_map_ant_32_int,
"HBAT_LED_on_R": value_map_ant_32_bool,
......@@ -518,22 +535,30 @@ class AntennaToRecvMapper(object):
"HBAT_PWR_LNA_on_RW": value_map_ant_32_bool,
"HBAT_PWR_on_R": value_map_ant_32_bool,
"HBAT_PWR_on_RW": value_map_ant_32_bool,
"RCU_PWR_ANT_on_R": value_map_ant_bool,
"RCU_PWR_ANT_on_RW": value_map_ant_bool,
"RCU_band_select_RW": numpy.zeros(number_of_antennas, dtype=numpy.int64)
}
self._masked_value_mapping_write = {
"ANT_mask_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96,
"RCU_PWR_ANT_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96,
"HBAT_BF_delay_steps_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96_32,
"HBAT_LED_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96_32,
"HBAT_PWR_LNA_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96_32,
"HBAT_PWR_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96_32,
"RCU_PWR_ANT_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96,
"RCU_band_select_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96,
}
self._reshape_attributes_in = {
"HBAT_BF_delay_steps_RW": (96, 32),
"RCU_PWR_ANT_on_R": (96,),
"RCU_PWR_ANT_on_RW": (96,),
"RCU_band_select_RW": (96,),
}
self._reshape_attributes_out = {
"HBAT_BF_delay_steps_RW": (96, 32),
"RCU_PWR_ANT_on_R": (32, 3),
"RCU_PWR_ANT_on_RW": (32, 3),
"RCU_band_select_RW": (32, 3),
}
def map_read(self, mapped_attribute: str, recv_results: List[any]) -> List[any]:
......@@ -557,8 +582,7 @@ class AntennaToRecvMapper(object):
def map_write(self, mapped_attribute: str, set_values: List[any]) -> List[any]:
"""Perform a mapped write for the attribute using the set_values
:param mapped_attribute: attribute identifier as present in
py:attribute:`~_default_value_mapping_write`
:param mapped_attribute: attribute identifier as present in py:attribute:`~_default_value_mapping_write`
:param set_values: The values to be set for the specified attribute
:return: set_values as mapped given attribute dimensions and control mapping
"""
......@@ -603,6 +627,7 @@ class AntennaToRecvMapper(object):
return mapped_values
# ----------
# Run server
# ----------
......
......@@ -28,10 +28,12 @@ from tangostationcontrol.devices.device_decorators import only_in_states
from tangostationcontrol.devices.opcua_device import opcua_device
import logging
logger = logging.getLogger()
__all__ = ["RECV", "main"]
@device_logging_to_python()
class RECV(opcua_device):
......@@ -78,7 +80,8 @@ class RECV(opcua_device):
RCU_PWR_ANT_on_RW_default = device_property(
dtype='DevVarBooleanArray',
mandatory=False,
default_value=[False] * 96 # turn power off by default in test setups, f.e. to prevent blowing up the noise sources
default_value=[False] * 96
# turn power off by default in test setups, f.e. to prevent blowing up the noise sources
)
RECVTR_monitor_rate_RW_default = device_property(
......@@ -135,31 +138,44 @@ class RECV(opcua_device):
# ----------
# Attributes
# ----------
ANT_mask_RW = attribute_wrapper(comms_annotation=["ANT_mask_RW" ],datatype=bool , dims=(96,), access=AttrWriteType.READ_WRITE)
ANT_mask_RW = attribute_wrapper(comms_annotation=["ANT_mask_RW"], datatype=bool, dims=(96,),
access=AttrWriteType.READ_WRITE)
# The HBAT beamformer delays represent 32 delays for each of the 96 inputs.
# The 32 delays deconstruct as delays[polarisation][dipole], and each delay is the number of 'delay steps' to apply (0.5ns for HBAT1).
HBAT_BF_delay_steps_R = attribute_wrapper(comms_annotation=["HBAT_BF_delay_steps_R" ],datatype=numpy.int64 , dims=(96,16,2))
HBAT_BF_delay_steps_RW = attribute_wrapper(comms_annotation=["HBAT_BF_delay_steps_RW" ],datatype=numpy.int64 , dims=(96,16,2), access=AttrWriteType.READ_WRITE)
HBAT_LED_on_R = attribute_wrapper(comms_annotation=["HBAT_LED_on_R" ],datatype=bool , dims=(96,16,2))
HBAT_LED_on_RW = attribute_wrapper(comms_annotation=["HBAT_LED_on_RW" ],datatype=bool , dims=(96,16,2), access=AttrWriteType.READ_WRITE)
HBAT_PWR_LNA_on_R = attribute_wrapper(comms_annotation=["HBAT_PWR_LNA_on_R" ],datatype=bool , dims=(96,16,2))
HBAT_PWR_LNA_on_RW = attribute_wrapper(comms_annotation=["HBAT_PWR_LNA_on_RW" ],datatype=bool , dims=(96,16,2), access=AttrWriteType.READ_WRITE)
HBAT_PWR_on_R = attribute_wrapper(comms_annotation=["HBAT_PWR_on_R" ],datatype=bool , dims=(96,16,2))
HBAT_PWR_on_RW = attribute_wrapper(comms_annotation=["HBAT_PWR_on_RW" ],datatype=bool , dims=(96,16,2), access=AttrWriteType.READ_WRITE)
RCU_ADC_locked_R = attribute_wrapper(comms_annotation=["RCU_ADC_locked_R" ],datatype=bool , dims=(96,))
RCU_attenuator_dB_R = attribute_wrapper(comms_annotation=["RCU_attenuator_dB_R" ],datatype=numpy.int64 , dims=(96,))
RCU_attenuator_dB_RW = attribute_wrapper(comms_annotation=["RCU_attenuator_dB_RW" ],datatype=numpy.int64 , dims=(96,), access=AttrWriteType.READ_WRITE)
RCU_band_select_R = attribute_wrapper(comms_annotation=["RCU_band_select_R" ],datatype=numpy.int64 , dims=(96,))
RCU_band_select_RW = attribute_wrapper(comms_annotation=["RCU_band_select_RW" ],datatype=numpy.int64 , dims=(96,), access=AttrWriteType.READ_WRITE)
RCU_DTH_freq_R = attribute_wrapper(comms_annotation=["RCU_DTH_freq_R" ],datatype=numpy.int64 , dims=(96,))
RCU_DTH_freq_RW = attribute_wrapper(comms_annotation=["RCU_DTH_freq_RW" ],datatype=numpy.int64 , dims=(96,), access=AttrWriteType.READ_WRITE)
RCU_DTH_on_R = attribute_wrapper(comms_annotation=["RCU_DTH_on_R" ],datatype=bool , dims=(96,))
HBAT_BF_delay_steps_R = attribute_wrapper(comms_annotation=["HBAT_BF_delay_steps_R"], datatype=numpy.int64,
dims=(96, 2, 16))
HBAT_BF_delay_steps_RW = attribute_wrapper(comms_annotation=["HBAT_BF_delay_steps_RW"], datatype=numpy.int64,
dims=(96, 2, 16), access=AttrWriteType.READ_WRITE)
HBAT_LED_on_R = attribute_wrapper(comms_annotation=["HBAT_LED_on_R"], datatype=bool, dims=(96, 2, 16))
HBAT_LED_on_RW = attribute_wrapper(comms_annotation=["HBAT_LED_on_RW"], datatype=bool, dims=(96, 2, 16),
access=AttrWriteType.READ_WRITE)
HBAT_PWR_LNA_on_R = attribute_wrapper(comms_annotation=["HBAT_PWR_LNA_on_R"], datatype=bool, dims=(96, 2, 16))
HBAT_PWR_LNA_on_RW = attribute_wrapper(comms_annotation=["HBAT_PWR_LNA_on_RW"], datatype=bool, dims=(96, 2, 16),
access=AttrWriteType.READ_WRITE)
HBAT_PWR_on_R = attribute_wrapper(comms_annotation=["HBAT_PWR_on_R"], datatype=bool, dims=(96, 2, 16))
HBAT_PWR_on_RW = attribute_wrapper(comms_annotation=["HBAT_PWR_on_RW"], datatype=bool, dims=(96, 2, 16),
access=AttrWriteType.READ_WRITE)
RCU_ADC_locked_R = attribute_wrapper(comms_annotation=["RCU_ADC_locked_R"], datatype=bool, dims=(32, 3))
RCU_attenuator_dB_R = attribute_wrapper(comms_annotation=["RCU_attenuator_dB_R"], datatype=numpy.int64,
dims=(32, 3))
RCU_attenuator_dB_RW = attribute_wrapper(comms_annotation=["RCU_attenuator_dB_RW"], datatype=numpy.int64,
dims=(32, 3), access=AttrWriteType.READ_WRITE)
RCU_band_select_R = attribute_wrapper(comms_annotation=["RCU_band_select_R"], datatype=numpy.int64, dims=(32, 3))
RCU_band_select_RW = attribute_wrapper(comms_annotation=["RCU_band_select_RW"], datatype=numpy.int64, dims=(32, 3),
access=AttrWriteType.READ_WRITE)
RCU_DTH_freq_R = attribute_wrapper(comms_annotation=["RCU_DTH_freq_R"], datatype=numpy.int64, dims=(32, 3))
RCU_DTH_freq_RW = attribute_wrapper(comms_annotation=["RCU_DTH_freq_RW"], datatype=numpy.int64, dims=(32, 3),
access=AttrWriteType.READ_WRITE)
RCU_DTH_on_R = attribute_wrapper(comms_annotation=["RCU_DTH_on_R"], datatype=bool, dims=(32, 3))
RCU_LED_green_on_R = attribute_wrapper(comms_annotation=["RCU_LED_green_on_R"], datatype=bool, dims=(32,))
RCU_LED_green_on_RW = attribute_wrapper(comms_annotation=["RCU_LED_green_on_RW" ],datatype=bool , dims=(32,), access=AttrWriteType.READ_WRITE)
RCU_LED_green_on_RW = attribute_wrapper(comms_annotation=["RCU_LED_green_on_RW"], datatype=bool, dims=(32,),
access=AttrWriteType.READ_WRITE)
RCU_LED_red_on_R = attribute_wrapper(comms_annotation=["RCU_LED_red_on_R"], datatype=bool, dims=(32,))
RCU_LED_red_on_RW = attribute_wrapper(comms_annotation=["RCU_LED_red_on_RW" ],datatype=bool , dims=(32,), access=AttrWriteType.READ_WRITE)
RCU_mask_RW = attribute_wrapper(comms_annotation=["RCU_mask_RW" ],datatype=bool , dims=(32,), access=AttrWriteType.READ_WRITE)
RCU_LED_red_on_RW = attribute_wrapper(comms_annotation=["RCU_LED_red_on_RW"], datatype=bool, dims=(32,),
access=AttrWriteType.READ_WRITE)
RCU_mask_RW = attribute_wrapper(comms_annotation=["RCU_mask_RW"], datatype=bool, dims=(32,),
access=AttrWriteType.READ_WRITE)
RCU_PCB_ID_R = attribute_wrapper(comms_annotation=["RCU_PCB_ID_R"], datatype=numpy.int64, dims=(32,))
RCU_PCB_number_R = attribute_wrapper(comms_annotation=["RCU_PCB_number_R"], datatype=str, dims=(32,))
RCU_PCB_version_R = attribute_wrapper(comms_annotation=["RCU_PCB_version_R"], datatype=str, dims=(32,))
......@@ -167,16 +183,20 @@ class RECV(opcua_device):
RCU_PWR_2V5_R = attribute_wrapper(comms_annotation=["RCU_PWR_2V5_R"], datatype=numpy.float64, dims=(32,))
RCU_PWR_3V3_R = attribute_wrapper(comms_annotation=["RCU_PWR_3V3_R"], datatype=numpy.float64, dims=(32,))
RCU_PWR_ANALOG_on_R = attribute_wrapper(comms_annotation=["RCU_PWR_ANALOG_on_R"], datatype=bool, dims=(32,))
RCU_PWR_ANT_IOUT_R = attribute_wrapper(comms_annotation=["RCU_PWR_ANT_IOUT_R" ],datatype=numpy.float64, dims=(96,))
RCU_PWR_ANT_on_R = attribute_wrapper(comms_annotation=["RCU_PWR_ANT_on_R" ],datatype=bool , dims=(96,))
RCU_PWR_ANT_on_RW = attribute_wrapper(comms_annotation=["RCU_PWR_ANT_on_RW" ],datatype=bool , dims=(96,), access=AttrWriteType.READ_WRITE)
RCU_PWR_ANT_VIN_R = attribute_wrapper(comms_annotation=["RCU_PWR_ANT_VIN_R" ],datatype=numpy.float64, dims=(96,))
RCU_PWR_ANT_VOUT_R = attribute_wrapper(comms_annotation=["RCU_PWR_ANT_VOUT_R" ],datatype=numpy.float64, dims=(96,))
RCU_PWR_ANT_IOUT_R = attribute_wrapper(comms_annotation=["RCU_PWR_ANT_IOUT_R"], datatype=numpy.float64,
dims=(32, 3))
RCU_PWR_ANT_on_R = attribute_wrapper(comms_annotation=["RCU_PWR_ANT_on_R"], datatype=bool, dims=(32, 3))
RCU_PWR_ANT_on_RW = attribute_wrapper(comms_annotation=["RCU_PWR_ANT_on_RW"], datatype=bool, dims=(32, 3),
access=AttrWriteType.READ_WRITE)
RCU_PWR_ANT_VIN_R = attribute_wrapper(comms_annotation=["RCU_PWR_ANT_VIN_R"], datatype=numpy.float64, dims=(32, 3))
RCU_PWR_ANT_VOUT_R = attribute_wrapper(comms_annotation=["RCU_PWR_ANT_VOUT_R"], datatype=numpy.float64,
dims=(32, 3))
RCU_PWR_DIGITAL_on_R = attribute_wrapper(comms_annotation=["RCU_PWR_DIGITAL_on_R"], datatype=bool, dims=(32,))
RCU_PWR_good_R = attribute_wrapper(comms_annotation=["RCU_PWR_good_R"], datatype=bool, dims=(32,))
RCU_TEMP_R = attribute_wrapper(comms_annotation=["RCU_TEMP_R"], datatype=numpy.float64, dims=(32,))
RECVTR_I2C_error_R = attribute_wrapper(comms_annotation=["RECVTR_I2C_error_R"], datatype=numpy.int64, dims=(32,))
RECVTR_monitor_rate_RW = attribute_wrapper(comms_annotation=["RECVTR_monitor_rate_RW" ],datatype=numpy.int64 , access=AttrWriteType.READ_WRITE)
RECVTR_monitor_rate_RW = attribute_wrapper(comms_annotation=["RECVTR_monitor_rate_RW"], datatype=numpy.int64,
access=AttrWriteType.READ_WRITE)
RECVTR_translator_busy_R = attribute_wrapper(comms_annotation=["RECVTR_translator_busy_R"], datatype=bool)
# ----------
......@@ -185,7 +205,8 @@ class RECV(opcua_device):
RCU_LED_colour_R = attribute(dtype=(numpy.uint32,), max_dim_x=32, fisallowed="is_attribute_access_allowed")
def read_RCU_LED_colour_R(self):
return (2 * self.read_attribute("RCU_LED_green_on_R") + 4 * self.read_attribute("RCU_LED_red_on_R")).astype(numpy.uint32)
return (2 * self.read_attribute("RCU_LED_green_on_R") + 4 * self.read_attribute("RCU_LED_red_on_R")).astype(
numpy.uint32)
RCU_error_R = attribute(dtype=(bool,), max_dim_x=32, fisallowed="is_attribute_access_allowed")
ANT_error_R = attribute(dtype=(bool,), max_dim_x=96, fisallowed="is_attribute_access_allowed")
......@@ -198,16 +219,17 @@ class RECV(opcua_device):
def read_ANT_error_R(self):
return self.read_attribute("ANT_mask_RW") & (
~self.read_attribute("RCU_ADC_locked_R")
~self.read_attribute("RCU_ADC_locked_R").flatten()
)
RECV_IOUT_error_R = attribute(dtype=(bool,), max_dim_x=96, fisallowed="is_attribute_access_allowed")
RECV_TEMP_error_R = attribute(dtype=(bool,), max_dim_x=32, fisallowed="is_attribute_access_allowed", polling_period=1000)
RECV_TEMP_error_R = attribute(dtype=(bool,), max_dim_x=32, fisallowed="is_attribute_access_allowed",
polling_period=1000)
RECV_VOUT_error_R = attribute(dtype=(bool,), max_dim_x=32, fisallowed="is_attribute_access_allowed")
def read_RECV_IOUT_error_R(self):
return self.read_attribute("ANT_mask_RW") & (
self.alarm_val("RCU_PWR_ANT_IOUT_R")
self.alarm_val("RCU_PWR_ANT_IOUT_R").flatten()
)
def read_RECV_TEMP_error_R(self):
......@@ -218,8 +240,8 @@ class RECV(opcua_device):
def read_RECV_VOUT_error_R(self):
return (self.read_attribute("ANT_mask_RW") & (
self.alarm_val("RCU_PWR_ANT_VIN_R")
| self.alarm_val("RCU_PWR_ANT_VOUT_R")
self.alarm_val("RCU_PWR_ANT_VIN_R").flatten()
| self.alarm_val("RCU_PWR_ANT_VOUT_R").flatten()
)).reshape(32, 3).any(axis=1) | (self.read_attribute("RCU_mask_RW") & (
self.alarm_val("RCU_PWR_1V8_R")
| self.alarm_val("RCU_PWR_2V5_R")
......@@ -295,7 +317,7 @@ class RECV(opcua_device):
def calculate_HBAT_bf_delay_steps(self, delays: numpy.ndarray):
""" converts a signal path delay (in seconds) to an analog beam weight """
# Reshape the flatten input array, into whatever how many tiles we get
# Reshape the flattened input array, into whatever how many tiles we get
delays = numpy.array(delays).reshape(-1, 16)
# Calculate the beam weight array
......@@ -348,6 +370,7 @@ class RECV(opcua_device):
"""
self.opcua_connection.call_method(["RCU_DTH_on"])
# ----------
# Run server
# ----------
......
......@@ -255,20 +255,20 @@ class TestAntennaFieldDevice(AbstractTestBases.TestDeviceBase):
antennafield_proxy.put_property(mapping_properties)
antennafield_proxy.boot()
self.recv_proxy.write_attribute("RCU_band_select_RW", [False] * 96)
self.recv_proxy.write_attribute("RCU_band_select_RW", [[False] * 3] * 32)
try:
antennafield_proxy.write_attribute(
"RCU_band_select_RW", [True] * 96
)
numpy.testing.assert_equal(
numpy.array([True] * 96),
numpy.array([[True] * 3] * 32),
self.recv_proxy.read_attribute("RCU_band_select_RW").value
)
finally:
# Always restore recv again
self.recv_proxy.write_attribute(
"RCU_band_select_RW", [False] * 96
"RCU_band_select_RW", [[False] * 3] * 32
)
# Verify device did not enter FAULT state
......
......@@ -92,15 +92,15 @@ class TestStatisticsWriterSST(BaseIntegrationTestCase):
# Test attribute values retrieval
collector.parse_device_attributes()
numpy.testing.assert_equal(
collector.parameters["rcu_attenuator_dB"].flatten(),
collector.parameters["rcu_attenuator_dB"],
self.recv_proxy.rcu_attenuator_dB_r
)
numpy.testing.assert_equal(
collector.parameters["rcu_band_select"].flatten(),
collector.parameters["rcu_band_select"],
self.recv_proxy.rcu_band_select_r.tolist()
)
numpy.testing.assert_equal(
collector.parameters["rcu_dth_on"].flatten(),
collector.parameters["rcu_dth_on"],
self.recv_proxy.rcu_dth_on_r.tolist()
)
......
......@@ -224,11 +224,35 @@ class TestAntennaToRecvMapper(base.TestCase):
actual = mapper.map_write("ANT_mask_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_rcu_pwr_ant_on_no_mapping_and_one_receiver(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1)
set_values = [None] * 48
expected = [[[None, None, None]] * 32]
actual = mapper.map_write("RCU_PWR_ANT_on_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_rcu_pwr_ant_on_no_mapping_and_two_receivers(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2)
set_values = [None] * 48
expected = [[[None, None, None]] * 32] * 2
actual = mapper.map_write("RCU_PWR_ANT_on_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_rcu_pwr_ant_on_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1)
set_values = [1, 0] + [None] * 46
expected = [[[0, 1, None]] + [[None, None, None]] * 31]
actual = mapper.map_write("RCU_PWR_ANT_on_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_rcu_band_select_no_mapping_and_one_receiver(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1)
set_values = [None] * 48
expected = [[None] * 96]
expected = [[[None, None, None]] * 32]
actual = mapper.map_write("RCU_band_select_RW", set_values)
numpy.testing.assert_equal(expected, actual)
......@@ -236,7 +260,7 @@ class TestAntennaToRecvMapper(base.TestCase):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2)
set_values = [None] * 48
expected = [[None] * 96] * 2
expected = [[[None, None, None]] * 32] * 2
actual = mapper.map_write("RCU_band_select_RW", set_values)
numpy.testing.assert_equal(expected, actual)
......@@ -244,7 +268,7 @@ class TestAntennaToRecvMapper(base.TestCase):
mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1)
set_values = [1, 0] + [None] * 46
expected = [[0, 1] + [None] * 94]
expected = [[[0, 1, None]] + [[None, None, None]] * 31]
actual = mapper.map_write("RCU_band_select_RW", set_values)
numpy.testing.assert_equal(expected, actual)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment