diff --git a/tangostationcontrol/tangostationcontrol/devices/antennafield.py b/tangostationcontrol/tangostationcontrol/devices/antennafield.py index 9491afbc3528fcaab0625c5588ac77e1881873a2..6eac0d1420522174fbe5dad558a304b51331e8a6 100644 --- a/tangostationcontrol/tangostationcontrol/devices/antennafield.py +++ b/tangostationcontrol/tangostationcontrol/devices/antennafield.py @@ -206,44 +206,44 @@ class AntennaField(LOFARDevice): ) Field_Attenuation = device_property( - doc=f"Attenuation value to apply on all inputs.", + doc="Attenuation value to apply on all inputs.", dtype="DevFloat", mandatory=False, default_value=0.0, ) Calibration_SDP_Subband_Weights_50MHz = device_property( - doc=f"Measured calibration values for the sdp.FPGA_subband_weights_RW " - f"columns of each polarisation of each antenna, at 50 MHz. Each " - f"polarisation is represented by a (real, imag) pair for every " - f"subband.", + doc="Measured calibration values for the sdp.FPGA_subband_weights_RW " + "columns of each polarisation of each antenna, at 50 MHz. Each " + "polarisation is represented by a (real, imag) pair for every " + "subband.", dtype="DevVarFloatArray", mandatory=False, ) Calibration_SDP_Subband_Weights_150MHz = device_property( - doc=f"Measured calibration values for the sdp.FPGA_subband_weights_RW " - f"columns of each polarisation of each antenna, at 150 MHz. Each " - f"polarisation is represented by a (real, imag) pair for every " - f"subband.", + doc="Measured calibration values for the sdp.FPGA_subband_weights_RW " + "columns of each polarisation of each antenna, at 150 MHz. Each " + "polarisation is represented by a (real, imag) pair for every " + "subband.", dtype="DevVarFloatArray", mandatory=False, ) Calibration_SDP_Subband_Weights_200MHz = device_property( - doc=f"Measured calibration values for the sdp.FPGA_subband_weights_RW " - f"columns of each polarisation of each antenna, at 200 MHz. Each " - f"polarisation is represented by a (real, imag) pair for every " - f"subband.", + doc="Measured calibration values for the sdp.FPGA_subband_weights_RW " + "columns of each polarisation of each antenna, at 200 MHz. Each " + "polarisation is represented by a (real, imag) pair for every " + "subband.", dtype="DevVarFloatArray", mandatory=False, ) Calibration_SDP_Subband_Weights_250MHz = device_property( - doc=f"Measured calibration values for the sdp.FPGA_subband_weights_RW " - f"columns of each polarisation of each antenna, at 250 MHz. Each " - f"polarisation is represented by a (real, imag) pair for every " - f"subband.", + doc="Measured calibration values for the sdp.FPGA_subband_weights_RW " + "columns of each polarisation of each antenna, at 250 MHz. Each " + "polarisation is represented by a (real, imag) pair for every " + "subband.", dtype="DevVarFloatArray", mandatory=False, ) @@ -423,13 +423,13 @@ class AntennaField(LOFARDevice): max_dim_x=MAX_ANTENNA, ) Antenna_Cables_Delay_R = attribute( - doc=f"Delay caused by the cable between antenna and RCU, in seconds.", + doc="Delay caused by the cable between antenna and RCU, in seconds.", dtype=(numpy.float64,), max_dim_x=MAX_ANTENNA, unit="s", ) Antenna_Cables_Loss_R = attribute( - doc=f"Loss caused by the cable between antenna and RCU, in dB.", + doc="Loss caused by the cable between antenna and RCU, in dB.", dtype=(numpy.float64,), max_dim_x=MAX_ANTENNA, unit="dB", @@ -438,43 +438,43 @@ class AntennaField(LOFARDevice): # ----- Calibration information Calibration_SDP_Signal_Input_Samples_Delay_R = attribute( - doc=f"Number of samples that each antenna signal should be delayed to line " - f"up. To be applied on sdp.FPGA_signal_input_samples_delay_RW.", + doc="Number of samples that each antenna signal should be delayed to line " + "up. To be applied on sdp.FPGA_signal_input_samples_delay_RW.", dtype=(numpy.uint32,), max_dim_x=MAX_ANTENNA, unit="samples", ) Calibration_RCU_Attenuation_dB_R = attribute( - doc=f"Amount of dB with which each antenna signal must be adjusted to line " - f"up. To be applied on recv.RCU_attenuator_dB_RW.", + doc="Amount of dB with which each antenna signal must be adjusted to line " + "up. To be applied on recv.RCU_attenuator_dB_RW.", dtype=(numpy.uint32,), max_dim_x=MAX_ANTENNA, unit="dB", ) Calibration_SDP_Fine_Calibration_Default_R = attribute( - doc=f"Computed calibration values for the fine calibration of each " - f"antenna. Each antenna is represented by a (delay, phase_offset, " - f"amplitude_scaling) triplet.", + doc="Computed calibration values for the fine calibration of each " + "antenna. Each antenna is represented by a (delay, phase_offset, " + "amplitude_scaling) triplet.", dtype=((numpy.float64,),), max_dim_y=MAX_ANTENNA * N_pol, max_dim_x=3, ) Calibration_SDP_Subband_Weights_Default_R = attribute( - doc=f"Calibration values for the rows in sdp.FPGA_subband_weights_RW " - f"relevant for our antennas, as computed. Each subband of each " - f"polarisation of each antenna is represented by a real_imag number " - f"(real, imag).", + doc="Calibration values for the rows in sdp.FPGA_subband_weights_RW " + "relevant for our antennas, as computed. Each subband of each " + "polarisation of each antenna is represented by a real_imag number " + "(real, imag).", dtype=((numpy.float64,),), max_dim_y=MAX_ANTENNA * N_pol, max_dim_x=N_subbands * VALUES_PER_COMPLEX, ) Calibration_SDP_Subband_Weights_R = attribute( - doc=f"Calibration values for the rows in sdp.FPGA_subband_weights_RW " - f"relevant for our antennas. Each subband of each polarisation of " - f"each antenna is represented by a real_imag number (real, imag). " - f"Returns the measured values from " - f"Calibration_SDP_Subband_Weights_XXXMHz if available, and values " - f"computed from Calibration_SDP_Fine_Calibration_Default_R otherwise.", + doc="Calibration values for the rows in sdp.FPGA_subband_weights_RW " + "relevant for our antennas. Each subband of each polarisation of " + "each antenna is represented by a real_imag number (real, imag). " + "Returns the measured values from " + "Calibration_SDP_Subband_Weights_XXXMHz if available, and values " + "computed from Calibration_SDP_Fine_Calibration_Default_R otherwise.", dtype=((numpy.float64,),), max_dim_y=MAX_ANTENNA * N_pol, max_dim_x=N_subbands * VALUES_PER_COMPLEX, @@ -643,7 +643,18 @@ class AntennaField(LOFARDevice): max_dim_x=MAX_ANTENNA, access=AttrWriteType.READ_WRITE, ) - + RCU_PCB_ID_R = MappedAttribute( + "RCU_PCB_ID_R", + dtype=((numpy.int64,),), + max_dim_x=2, + max_dim_y=MAX_ANTENNA, + ) + RCU_PCB_version_R = MappedAttribute( + "RCU_PCB_version_R", + dtype=((str,),), + max_dim_x=2, + max_dim_y=MAX_ANTENNA, + ) # ----- Attributes mapped on SDP FPGA_sdp_info_observation_id_R = MappedAttribute( @@ -1540,6 +1551,7 @@ class AntennaToSdpMapper(object): class AntennaToRecvMapper(object): _VALUE_MAP_NONE_96 = numpy.full(MAX_ANTENNA, None) _VALUE_MAP_NONE_96_32 = numpy.full((MAX_ANTENNA, N_elements * N_pol), None) + _VALUE_MAP_NONE_96_2 = numpy.full((MAX_ANTENNA, 2), None) def __init__( self, control_to_recv_mapping, power_to_recv_mapping, number_of_receivers @@ -1560,28 +1572,30 @@ class AntennaToRecvMapper(object): self._power_mapping = power_to_recv_mapping self._number_of_receivers = number_of_receivers self._value_mapper = { - "ANT_mask_RW": self._control_mapping, - "HBAT_BF_delay_steps_R": self._control_mapping, - "HBAT_BF_delay_steps_RW": self._control_mapping, - "HBAT_LED_on_R": self._control_mapping, - "HBAT_LED_on_RW": self._control_mapping, - "HBAT_PWR_LNA_on_R": self._control_mapping, - "HBAT_PWR_LNA_on_RW": self._control_mapping, - "HBAT_PWR_on_R": self._control_mapping, - "HBAT_PWR_on_RW": self._control_mapping, - "RCU_PWR_ANT_on_R": self._power_mapping, - "RCU_PWR_ANT_on_RW": self._power_mapping, - "RCU_band_select_R": self._control_mapping, - "RCU_band_select_RW": self._control_mapping, - "RCU_attenuator_dB_R": self._control_mapping, - "RCU_attenuator_dB_RW": self._control_mapping, - "RCU_DTH_freq_R": self._control_mapping, - "RCU_DTH_freq_RW": self._control_mapping, - "RCU_DTH_on_R": self._control_mapping, - "RCU_DTH_PWR_R": self._control_mapping, - "RCU_DTH_PWR_RW": self._control_mapping, - "RCU_DAB_filter_on_R": self._control_mapping, - "RCU_DAB_filter_on_RW": self._control_mapping, + "ANT_mask_RW": [self._control_mapping], + "HBAT_BF_delay_steps_R": [self._control_mapping], + "HBAT_BF_delay_steps_RW": [self._control_mapping], + "HBAT_LED_on_R": [self._control_mapping], + "HBAT_LED_on_RW": [self._control_mapping], + "HBAT_PWR_LNA_on_R": [self._control_mapping], + "HBAT_PWR_LNA_on_RW": [self._control_mapping], + "HBAT_PWR_on_R": [self._control_mapping], + "HBAT_PWR_on_RW": [self._control_mapping], + "RCU_PWR_ANT_on_R": [self._power_mapping], + "RCU_PWR_ANT_on_RW": [self._power_mapping], + "RCU_band_select_R": [self._control_mapping], + "RCU_band_select_RW": [self._control_mapping], + "RCU_attenuator_dB_R": [self._control_mapping], + "RCU_attenuator_dB_RW": [self._control_mapping], + "RCU_DTH_freq_R": [self._control_mapping], + "RCU_DTH_freq_RW": [self._control_mapping], + "RCU_DTH_on_R": [self._control_mapping], + "RCU_PCB_ID_R": [self._control_mapping, self._power_mapping], + "RCU_PCB_version_R": [self._control_mapping, self._power_mapping], + "RCU_DTH_PWR_R": [self._control_mapping], + "RCU_DTH_PWR_RW": [self._control_mapping], + "RCU_DAB_filter_on_R": [self._control_mapping], + "RCU_DAB_filter_on_RW": [self._control_mapping], } self._default_value_mapping_read = { "ANT_mask_RW": value_map_ant_bool, @@ -1606,6 +1620,8 @@ class AntennaToRecvMapper(object): "RCU_DTH_PWR_RW": numpy.zeros(number_of_antennas, dtype=numpy.float64), "RCU_DAB_filter_on_R": value_map_ant_bool, "RCU_DAB_filter_on_RW": value_map_ant_bool, + "RCU_PCB_ID_R": numpy.zeros((number_of_antennas, 2), dtype=numpy.int64), + "RCU_PCB_version_R": numpy.full((number_of_antennas, 2), "", dtype=str), } self._masked_value_mapping_write = { "ANT_mask_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96, @@ -1620,6 +1636,8 @@ class AntennaToRecvMapper(object): "RCU_DTH_freq_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96, "RCU_DTH_PWR_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96, "RCU_DAB_filter_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96, + "RCU_PCB_ID_R": AntennaToRecvMapper._VALUE_MAP_NONE_96_2, + "RCU_PCB_version_R": AntennaToRecvMapper._VALUE_MAP_NONE_96_2, } self._reshape_attributes_in = { "ANT_mask_RW": (MAX_ANTENNA,), @@ -1637,6 +1655,8 @@ class AntennaToRecvMapper(object): "RCU_DTH_PWR_RW": (MAX_ANTENNA,), "RCU_DAB_filter_on_R": (MAX_ANTENNA,), "RCU_DAB_filter_on_RW": (MAX_ANTENNA,), + "RCU_PCB_ID_R": (MAX_ANTENNA,), + "RCU_PCB_version_R": (MAX_ANTENNA,), } self._reshape_attributes_out = { "ANT_mask_RW": (N_rcu, N_rcu_inp), @@ -1654,6 +1674,15 @@ class AntennaToRecvMapper(object): "RCU_DTH_PWR_RW": (N_rcu, N_rcu_inp), "RCU_DAB_filter_on_R": (N_rcu, N_rcu_inp), "RCU_DAB_filter_on_RW": (N_rcu, N_rcu_inp), + "RCU_PCB_ID_R": (N_rcu,), + "RCU_PCB_version_R": (N_rcu,), + } + + # Attributes which need to be reshaped with a copy of their values, + # because RECV original dimension < AntennaField mapped dimension + self._fill_attributes_in = { + "RCU_PCB_ID_R": N_rcu_inp, + "RCU_PCB_version_R": N_rcu_inp, } def map_read(self, mapped_attribute: str, recv_results: List[any]) -> List[any]: @@ -1668,6 +1697,11 @@ class AntennaToRecvMapper(object): default_values = self._default_value_mapping_read[mapped_attribute] + if mapped_attribute in self._fill_attributes_in: + recv_results = [ + [x] * self._fill_attributes_in[mapped_attribute] for x in recv_results + ] + if mapped_attribute in self._reshape_attributes_in: recv_results = numpy.reshape( recv_results, @@ -1704,18 +1738,38 @@ class AntennaToRecvMapper(object): return mapped_values def _mapped_r_values( - self, recv_results: List[any], default_values: List[any], value_mapping + self, + recv_results: List[any], + default_values: List[any], + value_mapping: List[any], ): """Mapping for read using :py:attribute:`~_control_mapping` and shallow copy""" mapped_values = numpy.array(default_values) - for idx, mapping in enumerate(value_mapping): - recv = mapping[0] - rcu = mapping[1] - if recv > 0: - mapped_values[idx] = recv_results[recv - 1][rcu] - + # If mapping is based on only one map, then insert the value + # provided by the map + if len(value_mapping) == 1: + for idx, mapping in enumerate(value_mapping[0]): + recv = mapping[0] + rcu = mapping[1] + if recv > 0: + mapped_values[idx] = recv_results[recv - 1][rcu] + # If mapping is based on both power and control maps + if len(value_mapping) == 2: + # Assuming mapper lists are always in the following order: + # [Control_Mapping, Power_Mapping] + [control_mapping, power_mapping] = value_mapping + for idx, mapping in enumerate(control_mapping): + # Store index and values of both mappings + [recv_control, rcu_control] = mapping + [recv_power, rcu_power] = power_mapping[idx] + # Insert the two values in the mapped array + # as (rcu_control_val, rcu_power_val) + if recv_control > 0: + mapped_values[idx][0] = recv_results[recv_control - 1][rcu_control] + if recv_power > 0: + mapped_values[idx][1] = recv_results[recv_power - 1][rcu_power] return mapped_values def _mapped_rw_values( @@ -1729,7 +1783,7 @@ class AntennaToRecvMapper(object): mapped_values.append(default_values) mapped_values = numpy.array(mapped_values) - for idx, mapping in enumerate(value_mapping): + for idx, mapping in enumerate(value_mapping[0]): recv = mapping[0] rcu = mapping[1] if recv > 0: diff --git a/tangostationcontrol/tangostationcontrol/test/devices/test_antennafield_device.py b/tangostationcontrol/tangostationcontrol/test/devices/test_antennafield_device.py index 231ef5187742f46d4b895586ba6622b00fe3b5ce..15eed2ec48d984ef0acc64a5913860aba520ce3d 100644 --- a/tangostationcontrol/tangostationcontrol/test/devices/test_antennafield_device.py +++ b/tangostationcontrol/tangostationcontrol/test/devices/test_antennafield_device.py @@ -44,6 +44,9 @@ class TestAntennaToRecvMapper(base.TestCase): CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1 = [[1, 1], [1, 0]] + [[-1, -1]] * ( DEFAULT_N_HBA_TILES - 2 ) + CONTROL_HBA_0_AND_1_ON_RCU_3_AND_2_OF_RECV_1 = [[1, 3], [1, 2]] + [[-1, -1]] * ( + DEFAULT_N_HBA_TILES - 2 + ) # A mapping where Antennas are all not mapped to SDP FPGAs FPGA_NOT_CONNECTED = numpy.reshape( @@ -405,6 +408,55 @@ class TestAntennaToRecvMapper(base.TestCase): actual = mapper.map_read("HBAT_PWR_on_RW", receiver_values) numpy.testing.assert_equal(expected, actual) + def test_map_read_rcu_id_unmapped(self): + """Test whether RCU_PCB_ID_R is correctly read with no mapping""" + mapper = AntennaToRecvMapper( + self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 3 + ) + receiver_values = [list(range(32)), [0] * 32, [0] * 32] + expected = [[0] * 2] * DEFAULT_N_HBA_TILES + actual = mapper.map_read("RCU_PCB_ID_R", receiver_values) + numpy.testing.assert_equal(expected, actual) + + def test_map_read_rcu_id_control_connected_and_power_disconnected(self): + """Test whether RCU_PCB_ID_R is correctly read with control mapping + and no power mapping""" + mapper = AntennaToRecvMapper( + self.CONTROL_HBA_0_AND_1_ON_RCU_3_AND_2_OF_RECV_1, + self.POWER_NOT_CONNECTED, + 3, + ) + receiver_values = [list(range(32)), [0] * 32, [0] * 32] + expected = [[3, 0], [2, 0]] + [[0, 0]] * (DEFAULT_N_HBA_TILES - 2) + actual = mapper.map_read("RCU_PCB_ID_R", receiver_values) + numpy.testing.assert_equal(expected, actual) + + def test_map_read_rcu_id_control_disconnected_and_power_connected(self): + """Test whether RCU_PCB_ID_R is correctly read with power mapping + and no control mapping""" + mapper = AntennaToRecvMapper( + self.CONTROL_NOT_CONNECTED, + self.POWER_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, + 3, + ) + receiver_values = [list(range(32)), [0] * 32, [0] * 32] + expected = [[0, 1], [0, 0]] + [[0, 0]] * (DEFAULT_N_HBA_TILES - 2) + actual = mapper.map_read("RCU_PCB_ID_R", receiver_values) + numpy.testing.assert_equal(expected, actual) + + def test_map_read_rcu_id_control_and_power_connected(self): + """Test whether RCU_PCB_ID_R is correctly read with control mapping + and no power mapping""" + mapper = AntennaToRecvMapper( + self.CONTROL_HBA_0_AND_1_ON_RCU_3_AND_2_OF_RECV_1, + self.POWER_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, + 3, + ) + receiver_values = [list(range(32)), [0] * 32, [0] * 32] + expected = [[3, 1], [2, 0]] + [[0, 0]] * (DEFAULT_N_HBA_TILES - 2) + actual = mapper.map_read("RCU_PCB_ID_R", receiver_values) + numpy.testing.assert_equal(expected, actual) + # Rename to write def test_map_write_fpga_sdp_info_observation_id_no_mapping(self):