Skip to content
Snippets Groups Projects

L2SS-1200: add rcu_id and rcu_version

1 unresolved thread
Merged Stefano Di Frischia requested to merge L2SS-1200-rcu-attributes-from-antennafield into master
1 unresolved thread
Files
2
@@ -206,44 +206,44 @@ class AntennaField(LOFARDevice):
)
Field_Attenuation = device_property(
doc=f"Attenuation value to apply on all inputs.",
doc="Attenuation value to apply on all inputs.",
dtype="DevFloat",
mandatory=False,
default_value=0.0,
)
Calibration_SDP_Subband_Weights_50MHz = device_property(
doc=f"Measured calibration values for the sdp.FPGA_subband_weights_RW "
f"columns of each polarisation of each antenna, at 50 MHz. Each "
f"polarisation is represented by a (real, imag) pair for every "
f"subband.",
doc="Measured calibration values for the sdp.FPGA_subband_weights_RW "
"columns of each polarisation of each antenna, at 50 MHz. Each "
"polarisation is represented by a (real, imag) pair for every "
"subband.",
dtype="DevVarFloatArray",
mandatory=False,
)
Calibration_SDP_Subband_Weights_150MHz = device_property(
doc=f"Measured calibration values for the sdp.FPGA_subband_weights_RW "
f"columns of each polarisation of each antenna, at 150 MHz. Each "
f"polarisation is represented by a (real, imag) pair for every "
f"subband.",
doc="Measured calibration values for the sdp.FPGA_subband_weights_RW "
"columns of each polarisation of each antenna, at 150 MHz. Each "
"polarisation is represented by a (real, imag) pair for every "
"subband.",
dtype="DevVarFloatArray",
mandatory=False,
)
Calibration_SDP_Subband_Weights_200MHz = device_property(
doc=f"Measured calibration values for the sdp.FPGA_subband_weights_RW "
f"columns of each polarisation of each antenna, at 200 MHz. Each "
f"polarisation is represented by a (real, imag) pair for every "
f"subband.",
doc="Measured calibration values for the sdp.FPGA_subband_weights_RW "
"columns of each polarisation of each antenna, at 200 MHz. Each "
"polarisation is represented by a (real, imag) pair for every "
"subband.",
dtype="DevVarFloatArray",
mandatory=False,
)
Calibration_SDP_Subband_Weights_250MHz = device_property(
doc=f"Measured calibration values for the sdp.FPGA_subband_weights_RW "
f"columns of each polarisation of each antenna, at 250 MHz. Each "
f"polarisation is represented by a (real, imag) pair for every "
f"subband.",
doc="Measured calibration values for the sdp.FPGA_subband_weights_RW "
"columns of each polarisation of each antenna, at 250 MHz. Each "
"polarisation is represented by a (real, imag) pair for every "
"subband.",
dtype="DevVarFloatArray",
mandatory=False,
)
@@ -423,13 +423,13 @@ class AntennaField(LOFARDevice):
max_dim_x=MAX_ANTENNA,
)
Antenna_Cables_Delay_R = attribute(
doc=f"Delay caused by the cable between antenna and RCU, in seconds.",
doc="Delay caused by the cable between antenna and RCU, in seconds.",
dtype=(numpy.float64,),
max_dim_x=MAX_ANTENNA,
unit="s",
)
Antenna_Cables_Loss_R = attribute(
doc=f"Loss caused by the cable between antenna and RCU, in dB.",
doc="Loss caused by the cable between antenna and RCU, in dB.",
dtype=(numpy.float64,),
max_dim_x=MAX_ANTENNA,
unit="dB",
@@ -438,43 +438,43 @@ class AntennaField(LOFARDevice):
# ----- Calibration information
Calibration_SDP_Signal_Input_Samples_Delay_R = attribute(
doc=f"Number of samples that each antenna signal should be delayed to line "
f"up. To be applied on sdp.FPGA_signal_input_samples_delay_RW.",
doc="Number of samples that each antenna signal should be delayed to line "
"up. To be applied on sdp.FPGA_signal_input_samples_delay_RW.",
dtype=(numpy.uint32,),
max_dim_x=MAX_ANTENNA,
unit="samples",
)
Calibration_RCU_Attenuation_dB_R = attribute(
doc=f"Amount of dB with which each antenna signal must be adjusted to line "
f"up. To be applied on recv.RCU_attenuator_dB_RW.",
doc="Amount of dB with which each antenna signal must be adjusted to line "
"up. To be applied on recv.RCU_attenuator_dB_RW.",
dtype=(numpy.uint32,),
max_dim_x=MAX_ANTENNA,
unit="dB",
)
Calibration_SDP_Fine_Calibration_Default_R = attribute(
doc=f"Computed calibration values for the fine calibration of each "
f"antenna. Each antenna is represented by a (delay, phase_offset, "
f"amplitude_scaling) triplet.",
doc="Computed calibration values for the fine calibration of each "
"antenna. Each antenna is represented by a (delay, phase_offset, "
"amplitude_scaling) triplet.",
dtype=((numpy.float64,),),
max_dim_y=MAX_ANTENNA * N_pol,
max_dim_x=3,
)
Calibration_SDP_Subband_Weights_Default_R = attribute(
doc=f"Calibration values for the rows in sdp.FPGA_subband_weights_RW "
f"relevant for our antennas, as computed. Each subband of each "
f"polarisation of each antenna is represented by a real_imag number "
f"(real, imag).",
doc="Calibration values for the rows in sdp.FPGA_subband_weights_RW "
"relevant for our antennas, as computed. Each subband of each "
"polarisation of each antenna is represented by a real_imag number "
"(real, imag).",
dtype=((numpy.float64,),),
max_dim_y=MAX_ANTENNA * N_pol,
max_dim_x=N_subbands * VALUES_PER_COMPLEX,
)
Calibration_SDP_Subband_Weights_R = attribute(
doc=f"Calibration values for the rows in sdp.FPGA_subband_weights_RW "
f"relevant for our antennas. Each subband of each polarisation of "
f"each antenna is represented by a real_imag number (real, imag). "
f"Returns the measured values from "
f"Calibration_SDP_Subband_Weights_XXXMHz if available, and values "
f"computed from Calibration_SDP_Fine_Calibration_Default_R otherwise.",
doc="Calibration values for the rows in sdp.FPGA_subband_weights_RW "
"relevant for our antennas. Each subband of each polarisation of "
"each antenna is represented by a real_imag number (real, imag). "
"Returns the measured values from "
"Calibration_SDP_Subband_Weights_XXXMHz if available, and values "
"computed from Calibration_SDP_Fine_Calibration_Default_R otherwise.",
dtype=((numpy.float64,),),
max_dim_y=MAX_ANTENNA * N_pol,
max_dim_x=N_subbands * VALUES_PER_COMPLEX,
@@ -643,7 +643,18 @@ class AntennaField(LOFARDevice):
max_dim_x=MAX_ANTENNA,
access=AttrWriteType.READ_WRITE,
)
RCU_PCB_ID_R = MappedAttribute(
"RCU_PCB_ID_R",
dtype=((numpy.int64,),),
max_dim_x=2,
max_dim_y=MAX_ANTENNA,
)
RCU_PCB_version_R = MappedAttribute(
"RCU_PCB_version_R",
dtype=((str,),),
max_dim_x=2,
max_dim_y=MAX_ANTENNA,
)
# ----- Attributes mapped on SDP
FPGA_sdp_info_observation_id_R = MappedAttribute(
@@ -1540,6 +1551,7 @@ class AntennaToSdpMapper(object):
class AntennaToRecvMapper(object):
_VALUE_MAP_NONE_96 = numpy.full(MAX_ANTENNA, None)
_VALUE_MAP_NONE_96_32 = numpy.full((MAX_ANTENNA, N_elements * N_pol), None)
_VALUE_MAP_NONE_96_2 = numpy.full((MAX_ANTENNA, 2), None)
def __init__(
self, control_to_recv_mapping, power_to_recv_mapping, number_of_receivers
@@ -1560,28 +1572,30 @@ class AntennaToRecvMapper(object):
self._power_mapping = power_to_recv_mapping
self._number_of_receivers = number_of_receivers
self._value_mapper = {
"ANT_mask_RW": self._control_mapping,
"HBAT_BF_delay_steps_R": self._control_mapping,
"HBAT_BF_delay_steps_RW": self._control_mapping,
"HBAT_LED_on_R": self._control_mapping,
"HBAT_LED_on_RW": self._control_mapping,
"HBAT_PWR_LNA_on_R": self._control_mapping,
"HBAT_PWR_LNA_on_RW": self._control_mapping,
"HBAT_PWR_on_R": self._control_mapping,
"HBAT_PWR_on_RW": self._control_mapping,
"RCU_PWR_ANT_on_R": self._power_mapping,
"RCU_PWR_ANT_on_RW": self._power_mapping,
"RCU_band_select_R": self._control_mapping,
"RCU_band_select_RW": self._control_mapping,
"RCU_attenuator_dB_R": self._control_mapping,
"RCU_attenuator_dB_RW": self._control_mapping,
"RCU_DTH_freq_R": self._control_mapping,
"RCU_DTH_freq_RW": self._control_mapping,
"RCU_DTH_on_R": self._control_mapping,
"RCU_DTH_PWR_R": self._control_mapping,
"RCU_DTH_PWR_RW": self._control_mapping,
"RCU_DAB_filter_on_R": self._control_mapping,
"RCU_DAB_filter_on_RW": self._control_mapping,
"ANT_mask_RW": [self._control_mapping],
"HBAT_BF_delay_steps_R": [self._control_mapping],
"HBAT_BF_delay_steps_RW": [self._control_mapping],
"HBAT_LED_on_R": [self._control_mapping],
"HBAT_LED_on_RW": [self._control_mapping],
"HBAT_PWR_LNA_on_R": [self._control_mapping],
"HBAT_PWR_LNA_on_RW": [self._control_mapping],
"HBAT_PWR_on_R": [self._control_mapping],
"HBAT_PWR_on_RW": [self._control_mapping],
"RCU_PWR_ANT_on_R": [self._power_mapping],
"RCU_PWR_ANT_on_RW": [self._power_mapping],
"RCU_band_select_R": [self._control_mapping],
"RCU_band_select_RW": [self._control_mapping],
"RCU_attenuator_dB_R": [self._control_mapping],
"RCU_attenuator_dB_RW": [self._control_mapping],
"RCU_DTH_freq_R": [self._control_mapping],
"RCU_DTH_freq_RW": [self._control_mapping],
"RCU_DTH_on_R": [self._control_mapping],
"RCU_PCB_ID_R": [self._control_mapping, self._power_mapping],
"RCU_PCB_version_R": [self._control_mapping, self._power_mapping],
"RCU_DTH_PWR_R": [self._control_mapping],
"RCU_DTH_PWR_RW": [self._control_mapping],
"RCU_DAB_filter_on_R": [self._control_mapping],
"RCU_DAB_filter_on_RW": [self._control_mapping],
}
self._default_value_mapping_read = {
"ANT_mask_RW": value_map_ant_bool,
@@ -1606,6 +1620,8 @@ class AntennaToRecvMapper(object):
"RCU_DTH_PWR_RW": numpy.zeros(number_of_antennas, dtype=numpy.float64),
"RCU_DAB_filter_on_R": value_map_ant_bool,
"RCU_DAB_filter_on_RW": value_map_ant_bool,
"RCU_PCB_ID_R": numpy.zeros((number_of_antennas, 2), dtype=numpy.int64),
"RCU_PCB_version_R": numpy.full((number_of_antennas, 2), "", dtype=str),
}
self._masked_value_mapping_write = {
"ANT_mask_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96,
@@ -1620,6 +1636,8 @@ class AntennaToRecvMapper(object):
"RCU_DTH_freq_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96,
"RCU_DTH_PWR_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96,
"RCU_DAB_filter_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96,
"RCU_PCB_ID_R": AntennaToRecvMapper._VALUE_MAP_NONE_96_2,
"RCU_PCB_version_R": AntennaToRecvMapper._VALUE_MAP_NONE_96_2,
}
self._reshape_attributes_in = {
"ANT_mask_RW": (MAX_ANTENNA,),
@@ -1637,6 +1655,8 @@ class AntennaToRecvMapper(object):
"RCU_DTH_PWR_RW": (MAX_ANTENNA,),
"RCU_DAB_filter_on_R": (MAX_ANTENNA,),
"RCU_DAB_filter_on_RW": (MAX_ANTENNA,),
"RCU_PCB_ID_R": (MAX_ANTENNA,),
"RCU_PCB_version_R": (MAX_ANTENNA,),
}
self._reshape_attributes_out = {
"ANT_mask_RW": (N_rcu, N_rcu_inp),
@@ -1654,6 +1674,15 @@ class AntennaToRecvMapper(object):
"RCU_DTH_PWR_RW": (N_rcu, N_rcu_inp),
"RCU_DAB_filter_on_R": (N_rcu, N_rcu_inp),
"RCU_DAB_filter_on_RW": (N_rcu, N_rcu_inp),
"RCU_PCB_ID_R": (N_rcu,),
"RCU_PCB_version_R": (N_rcu,),
}
# Attributes which need to be reshaped with a copy of their values,
# because RECV original dimension < AntennaField mapped dimension
self._fill_attributes_in = {
"RCU_PCB_ID_R": N_rcu_inp,
"RCU_PCB_version_R": N_rcu_inp,
}
def map_read(self, mapped_attribute: str, recv_results: List[any]) -> List[any]:
@@ -1668,6 +1697,11 @@ class AntennaToRecvMapper(object):
default_values = self._default_value_mapping_read[mapped_attribute]
if mapped_attribute in self._fill_attributes_in:
recv_results = [
[x] * self._fill_attributes_in[mapped_attribute] for x in recv_results
]
if mapped_attribute in self._reshape_attributes_in:
recv_results = numpy.reshape(
recv_results,
@@ -1704,18 +1738,38 @@ class AntennaToRecvMapper(object):
return mapped_values
def _mapped_r_values(
self, recv_results: List[any], default_values: List[any], value_mapping
self,
recv_results: List[any],
default_values: List[any],
value_mapping: List[any],
):
"""Mapping for read using :py:attribute:`~_control_mapping` and shallow copy"""
mapped_values = numpy.array(default_values)
for idx, mapping in enumerate(value_mapping):
recv = mapping[0]
rcu = mapping[1]
if recv > 0:
mapped_values[idx] = recv_results[recv - 1][rcu]
# If mapping is based on only one map, then insert the value
# provided by the map
if len(value_mapping) == 1:
for idx, mapping in enumerate(value_mapping[0]):
recv = mapping[0]
rcu = mapping[1]
if recv > 0:
mapped_values[idx] = recv_results[recv - 1][rcu]
Please register or sign in to reply
# If mapping is based on both power and control maps
if len(value_mapping) == 2:
# Assuming mapper lists are always in the following order:
# [Control_Mapping, Power_Mapping]
[control_mapping, power_mapping] = value_mapping
for idx, mapping in enumerate(control_mapping):
# Store index and values of both mappings
[recv_control, rcu_control] = mapping
[recv_power, rcu_power] = power_mapping[idx]
# Insert the two values in the mapped array
# as (rcu_control_val, rcu_power_val)
if recv_control > 0:
mapped_values[idx][0] = recv_results[recv_control - 1][rcu_control]
if recv_power > 0:
mapped_values[idx][1] = recv_results[recv_power - 1][rcu_power]
return mapped_values
def _mapped_rw_values(
@@ -1729,7 +1783,7 @@ class AntennaToRecvMapper(object):
mapped_values.append(default_values)
mapped_values = numpy.array(mapped_values)
for idx, mapping in enumerate(value_mapping):
for idx, mapping in enumerate(value_mapping[0]):
recv = mapping[0]
rcu = mapping[1]
if recv > 0:
Loading