diff --git a/tangostationcontrol/tangostationcontrol/devices/antennafield.py b/tangostationcontrol/tangostationcontrol/devices/antennafield.py index 565f979539a3a70f538f39f880cd7efeae0102e4..86271d18c38805c0015a22d82459f85dcbec3bf1 100644 --- a/tangostationcontrol/tangostationcontrol/devices/antennafield.py +++ b/tangostationcontrol/tangostationcontrol/devices/antennafield.py @@ -9,6 +9,7 @@ from enum import IntEnum from math import pi import numpy +from typing import List # PyTango imports from tango import DeviceProxy, DevSource, AttrWriteType, DevVarFloatArray, DevVarLongArray @@ -384,7 +385,7 @@ class AntennaField(lofar_device): power_mapping = numpy.reshape(self.Power_to_RECV_mapping, (-1, 2)) self.__mapper = AntennaToRecvMapper(control_mapping, power_mapping, number_of_receivers) - def get_mapped_attribute(self, mapped_point): + def get_mapped_attribute(self, mapped_point: str): recv_results = [] for recv_proxy in self.recv_proxies: @@ -395,11 +396,14 @@ class AntennaField(lofar_device): return mapped_values - def set_mapped_attribute(self, mapped_point, value): + def set_mapped_attribute(self, mapped_point: str, value): mapped_value = self.__mapper.map_write(mapped_point, value) for idx, recv_proxy in enumerate(self.recv_proxies): - recv_proxy.write_attribute(mapped_point, mapped_value[idx]) + new_values = mapped_value[idx] + current_values = recv_proxy.read_attribute(mapped_point) + new_values = self.__mapper.merge_write(new_values, current_values) + recv_proxy.write_attribute(mapped_point, new_values) # -------- # Overloaded functions @@ -452,67 +456,109 @@ class AntennaField(lofar_device): return result_values.flatten() + class AntennaToRecvMapper(object): + + _VALUE_MAP_NONE_96 = numpy.full(96, None) + _VALUE_MAP_NONE_96_32 = numpy.full((96, 32), None) + def __init__(self, control_to_recv_mapping, power_to_recv_mapping, number_of_receivers): number_of_antennas = len(control_to_recv_mapping) - self.__control_mapping = control_to_recv_mapping - self.__power_mapping = power_to_recv_mapping - self.__number_of_receivers = number_of_receivers - self.__default_value_mapping_read = { - "ANT_mask_RW": numpy.full(number_of_antennas, False), - "RCU_PWR_ANT_on_R": numpy.full(number_of_antennas, False), - "RCU_PWR_ANT_on_RW": numpy.full(number_of_antennas, False), - "HBAT_BF_delay_steps_R": numpy.zeros([number_of_antennas,32], dtype=numpy.int64), - "HBAT_BF_delay_steps_RW": numpy.zeros([number_of_antennas,32], dtype=numpy.int64), - "HBAT_LED_on_R": numpy.full((number_of_antennas,32), False), - "HBAT_LED_on_RW": numpy.full((number_of_antennas,32), False), - "HBAT_PWR_LNA_on_R": numpy.full((number_of_antennas,32), False), - "HBAT_PWR_LNA_on_RW": numpy.full((number_of_antennas,32), False), - "HBAT_PWR_on_R": numpy.full((number_of_antennas,32), False), - "HBAT_PWR_on_RW": numpy.full((number_of_antennas,32), False), + # Reduce memory footprint of default values by creating single instance of common fields + value_map_ant_32_int = numpy.zeros([number_of_antennas, 32], dtype=numpy.int64) + value_map_ant_32_bool = numpy.full((number_of_antennas, 32), False) + value_map_ant_bool = numpy.full(number_of_antennas, False) + + self._control_mapping = control_to_recv_mapping + self._power_mapping = power_to_recv_mapping + self._number_of_receivers = number_of_receivers + self._default_value_mapping_read = { + "ANT_mask_RW": value_map_ant_bool, + "RCU_PWR_ANT_on_R": value_map_ant_bool, + "RCU_PWR_ANT_on_RW": value_map_ant_bool, + "HBAT_BF_delay_steps_R": value_map_ant_32_int, + "HBAT_BF_delay_steps_RW": value_map_ant_32_int, + "HBAT_LED_on_R": value_map_ant_32_bool, + "HBAT_LED_on_RW": value_map_ant_32_bool, + "HBAT_PWR_LNA_on_R": value_map_ant_32_bool, + "HBAT_PWR_LNA_on_RW": value_map_ant_32_bool, + "HBAT_PWR_on_R": value_map_ant_32_bool, + "HBAT_PWR_on_RW": value_map_ant_32_bool, "RCU_band_select_RW": numpy.zeros(number_of_antennas, dtype=numpy.int64) } - self.__default_value_mapping_write = { - "ANT_mask_RW": numpy.full(96, False), - "RCU_PWR_ANT_on_RW": numpy.full(96, False), - "HBAT_BF_delay_steps_RW": numpy.zeros([96,32], dtype=numpy.int64), - "HBAT_LED_on_RW": numpy.full((96,32), False), - "HBAT_PWR_LNA_on_RW": numpy.full((96,32), False), - "HBAT_PWR_on_RW": numpy.full((96,32), False), - "RCU_band_select_RW": numpy.zeros(96, dtype=numpy.int64) + self._masked_value_mapping_write = { + "ANT_mask_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96, + "RCU_PWR_ANT_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96, + "HBAT_BF_delay_steps_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96_32, + "HBAT_LED_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96_32, + "HBAT_PWR_LNA_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96_32, + "HBAT_PWR_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96_32, + "RCU_band_select_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96, } - self.__reshape_attributes_in = { + self._reshape_attributes_in = { "HBAT_BF_delay_steps_RW": (96, 32), } - self.__reshape_attributes_out = { + self._reshape_attributes_out = { "HBAT_BF_delay_steps_RW": (96, 32), } - def map_read(self, mapped_attribute, recv_results): - default_values = self.__default_value_mapping_read[mapped_attribute] + def map_read(self, mapped_attribute: str, recv_results: List[any]): + """ - if mapped_attribute in self.__reshape_attributes_in: + :param mapped_attribute: attribute identifier as present in + py:attribute:`~_default_value_mapping_read` + :param recv_results: Results as gathered by appending attributes from RECV + devices + :return: + """ + + default_values = self._default_value_mapping_read[mapped_attribute] + + if mapped_attribute in self._reshape_attributes_in: recv_results = numpy.reshape(recv_results, - (self.__number_of_receivers,) + self.__reshape_attributes_in[mapped_attribute]) + (self._number_of_receivers,) + self._reshape_attributes_in[mapped_attribute]) return self._mapped_r_values(recv_results, default_values) - def map_write(self, mapped_attribute, set_values): - default_values = self.__default_value_mapping_write[mapped_attribute] + def map_write(self, mapped_attribute: str, set_values): + """Perform a mapped write for the attribute using the set_values + + :param mapped_attribute: attribute identifier as present in + py:attribute:`~_default_value_mapping_write` + :param set_values: + :return: + """ + + default_values = self._masked_value_mapping_write[mapped_attribute] mapped_values = self._mapped_rw_values(set_values, default_values) - if mapped_attribute in self.__reshape_attributes_out: + if mapped_attribute in self._reshape_attributes_out: mapped_values = numpy.reshape(mapped_values, - (self.__number_of_receivers,) + self.__reshape_attributes_out[mapped_attribute]) + (self._number_of_receivers,) + self._reshape_attributes_out[mapped_attribute]) return mapped_values + def merge_write(self, mapped_values, current_values): + """Merge values as retrieved from :py:func:`~map_write` with current_values + + To be used by the :py:class:`~AntennaField` device to remove None fields + from mapped_values with recently retrieved current_values from RECV device. + + :param mapped_values: + :param current_values: + :return: + """ + + for idx, value in enumerate(mapped_values): + if value is None: + mapped_values[idx] = current_values[idx] + def _mapped_r_values(self, recv_results, default_values): mapped_values = numpy.array(default_values) - for idx, mapping in enumerate(self.__control_mapping): + for idx, mapping in enumerate(self._control_mapping): recv = mapping[0] rcu = mapping[1] if recv > 0: @@ -523,16 +569,17 @@ class AntennaToRecvMapper(object): def _mapped_rw_values(self, set_values, default_values): mapped_values = [] - for _ in range(self.__number_of_receivers): + for _ in range(self._number_of_receivers): mapped_values.append(default_values) mapped_values = numpy.array(mapped_values) - for idx, mapping in enumerate(self.__control_mapping): + for idx, mapping in enumerate(self._control_mapping): recv = mapping[0] rcu = mapping[1] if recv > 0: mapped_values[recv - 1, rcu] = set_values[idx] + # import pdb; pdb.set_trace() return mapped_values # ---------- diff --git a/tangostationcontrol/tangostationcontrol/test/devices/test_antennafield_device.py b/tangostationcontrol/tangostationcontrol/test/devices/test_antennafield_device.py index 2e4b798e52545e6e0e66b176a27da6c90314f4df..dbb2842bc81bedc78b93ec22763a3fb301048db9 100644 --- a/tangostationcontrol/tangostationcontrol/test/devices/test_antennafield_device.py +++ b/tangostationcontrol/tangostationcontrol/test/devices/test_antennafield_device.py @@ -190,50 +190,54 @@ class TestAntennaToRecvMapper(base.TestCase): # Rename to write def test_map_write_ant_mask_rw_no_mapping_and_one_receiver(self): + """Verify results None without control and array sizes""" + mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1) - set_values = [False] * 48 - expected = [[False] * 96] + set_values = [None] * 48 + expected = [[None] * 96] actual = mapper.map_write("ANT_mask_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_ant_mask_rw_no_mapping_and_two_receivers(self): + """Verify results None without control and array sizes""" + mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2) - set_values = [False] * 48 - expected = [[False] * 96] * 2 + set_values = [None] * 48 + expected = [[None] * 96] * 2 actual = mapper.map_write("ANT_mask_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_ant_mask_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1) - set_values = [True, False] + [False] * 46 - expected = [[False, True, False] + [False] * 93] + set_values = [True, False] + [None] * 46 + expected = [[False, True] + [None] * 94] actual = mapper.map_write("ANT_mask_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_rcu_band_select_no_mapping_and_one_receiver(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1) - set_values = [0] * 48 - expected = [[0] * 96] + set_values = [None] * 48 + expected = [[None] * 96] actual = mapper.map_write("RCU_band_select_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_rcu_band_select_no_mapping_and_two_receivers(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2) - set_values = [0] * 48 - expected = [[0] * 96] * 2 + set_values = [None] * 48 + expected = [[None] * 96] * 2 actual = mapper.map_write("RCU_band_select_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_rcu_band_select_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1) - set_values = [1, 0] + [0] * 46 - expected = [[0, 1, 0] + [0] * 93] + set_values = [1, 0] + [None] * 46 + expected = [[0, 1] + [None] * 94] actual = mapper.map_write("RCU_band_select_RW", set_values) numpy.testing.assert_equal(expected, actual) @@ -241,7 +245,7 @@ class TestAntennaToRecvMapper(base.TestCase): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1) set_values = [[1] * 32] * 48 - expected = [[[0] * 32] * 96] + expected = [[[None] * 32] * 96] actual = mapper.map_write("HBAT_BF_delay_steps_RW", set_values) numpy.testing.assert_equal(expected, actual) @@ -249,87 +253,87 @@ class TestAntennaToRecvMapper(base.TestCase): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2) set_values = [[1] * 32] * 48 - expected = [[[0] * 32] * 96, [[0] * 32] * 96] + expected = [[[None] * 32] * 96, [[None] * 32] * 96] actual = mapper.map_write("HBAT_BF_delay_steps_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_bf_delay_steps_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1) - set_values = [[1] * 32, [2] * 32] + [[0] * 32] * 46 - expected = [[[2] * 32, [1] * 32] + [[0] * 32] * 94] + set_values = [[1] * 32, [2] * 32] + [[None] * 32] * 46 + expected = [[[2] * 32, [1] * 32] + [[None] * 32] * 94] actual = mapper.map_write("HBAT_BF_delay_steps_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_led_on_rw_no_mapping_and_one_receiver(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1) - set_values = [[False] * 32] * 48 - expected = [[[False] * 32] * 96] + set_values = [[None] * 32] * 48 + expected = [[[None] * 32] * 96] actual = mapper.map_write("HBAT_LED_on_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_led_on_rw_no_mapping_and_two_receivers(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2) - set_values = [[False] * 32] * 48 - expected = [[[False] * 32] * 96, [[False] * 32] * 96] + set_values = [[None] * 32] * 48 + expected = [[[None] * 32] * 96, [[None] * 32] * 96] actual = mapper.map_write("HBAT_LED_on_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_led_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1) - set_values = [[False, True] * 16, [True, False] * 16] + [[False] * 32] * 46 - expected = [[[True, False] * 16, [False, True] * 16] + [[False] * 32] * 94] + set_values = [[False, True] * 16, [True, False] * 16] + [[None] * 32] * 46 + expected = [[[True, False] * 16, [False, True] * 16] + [[None] * 32] * 94] actual = mapper.map_write("HBAT_LED_on_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_pwr_lna_on_rw_no_mapping_and_one_receiver(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1) - set_values = [[False] * 32] * 48 - expected = [[[False] * 32] * 96] + set_values = [[None] * 32] * 48 + expected = [[[None] * 32] * 96] actual = mapper.map_write("HBAT_PWR_LNA_on_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_pwr_lna_on_rw_no_mapping_and_two_receivers(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2) - set_values = [[False] * 32] * 48 - expected = [[[False] * 32] * 96, [[False] * 32] * 96] + set_values = [[None] * 32] * 48 + expected = [[[None] * 32] * 96, [[None] * 32] * 96] actual = mapper.map_write("HBAT_PWR_LNA_on_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_pwr_lna_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1) - set_values = [[False, True] * 16, [True, False] * 16] + [[False] * 32] * 46 - expected = [[[True, False] * 16, [False, True] * 16] + [[False] * 32] * 94] + set_values = [[False, True] * 16, [True, False] * 16] + [[None] * 32] * 46 + expected = [[[True, False] * 16, [False, True] * 16] + [[None] * 32] * 94] actual = mapper.map_write("HBAT_PWR_LNA_on_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_pwr_on_rw_no_mapping_and_one_receiver(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1) - set_values = [[False] * 32] * 48 - expected = [[[False] * 32] * 96] + set_values = [[None] * 32] * 48 + expected = [[[None] * 32] * 96] actual = mapper.map_write("HBAT_PWR_on_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_lna_on_rw_no_mapping_and_two_receivers(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2) - set_values = [[False] * 32] * 48 - expected = [[[False] * 32] * 96, [[False] * 32] * 96] + set_values = [[None] * 32] * 48 + expected = [[[None] * 32] * 96, [[None] * 32] * 96] actual = mapper.map_write("HBAT_PWR_on_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_pwr_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1) - set_values = [[False, True] * 16, [True, False] * 16] + [[False] * 32] * 46 - expected = [[[True, False] * 16, [False, True] * 16] + [[False] * 32] * 94] + set_values = [[False, True] * 16, [True, False] * 16] + [[None] * 32] * 46 + expected = [[[True, False] * 16, [False, True] * 16] + [[None] * 32] * 94] actual = mapper.map_write("HBAT_PWR_on_RW", set_values) numpy.testing.assert_equal(expected, actual)