Skip to content
Snippets Groups Projects
Commit 9b7098db authored by Corné Lukken's avatar Corné Lukken
Browse files

L2SS-876: Get current RECV values before updating through antennafield

parent 673aadb6
No related branches found
No related tags found
1 merge request!408L2SS-876: Get current RECV values before updating through antennafield
......@@ -9,6 +9,7 @@
from enum import IntEnum
from math import pi
import numpy
from typing import List
# PyTango imports
from tango import DeviceProxy, DevSource, AttrWriteType, DevVarFloatArray, DevVarLongArray
......@@ -384,7 +385,7 @@ class AntennaField(lofar_device):
power_mapping = numpy.reshape(self.Power_to_RECV_mapping, (-1, 2))
self.__mapper = AntennaToRecvMapper(control_mapping, power_mapping, number_of_receivers)
def get_mapped_attribute(self, mapped_point):
def get_mapped_attribute(self, mapped_point: str):
recv_results = []
for recv_proxy in self.recv_proxies:
......@@ -395,11 +396,14 @@ class AntennaField(lofar_device):
return mapped_values
def set_mapped_attribute(self, mapped_point, value):
def set_mapped_attribute(self, mapped_point: str, value):
mapped_value = self.__mapper.map_write(mapped_point, value)
for idx, recv_proxy in enumerate(self.recv_proxies):
recv_proxy.write_attribute(mapped_point, mapped_value[idx])
new_values = mapped_value[idx]
current_values = recv_proxy.read_attribute(mapped_point)
new_values = self.__mapper.merge_write(new_values, current_values)
recv_proxy.write_attribute(mapped_point, new_values)
# --------
# Overloaded functions
......@@ -452,67 +456,109 @@ class AntennaField(lofar_device):
return result_values.flatten()
class AntennaToRecvMapper(object):
_VALUE_MAP_NONE_96 = numpy.full(96, None)
_VALUE_MAP_NONE_96_32 = numpy.full((96, 32), None)
def __init__(self, control_to_recv_mapping, power_to_recv_mapping, number_of_receivers):
number_of_antennas = len(control_to_recv_mapping)
self.__control_mapping = control_to_recv_mapping
self.__power_mapping = power_to_recv_mapping
self.__number_of_receivers = number_of_receivers
self.__default_value_mapping_read = {
"ANT_mask_RW": numpy.full(number_of_antennas, False),
"RCU_PWR_ANT_on_R": numpy.full(number_of_antennas, False),
"RCU_PWR_ANT_on_RW": numpy.full(number_of_antennas, False),
"HBAT_BF_delay_steps_R": numpy.zeros([number_of_antennas,32], dtype=numpy.int64),
"HBAT_BF_delay_steps_RW": numpy.zeros([number_of_antennas,32], dtype=numpy.int64),
"HBAT_LED_on_R": numpy.full((number_of_antennas,32), False),
"HBAT_LED_on_RW": numpy.full((number_of_antennas,32), False),
"HBAT_PWR_LNA_on_R": numpy.full((number_of_antennas,32), False),
"HBAT_PWR_LNA_on_RW": numpy.full((number_of_antennas,32), False),
"HBAT_PWR_on_R": numpy.full((number_of_antennas,32), False),
"HBAT_PWR_on_RW": numpy.full((number_of_antennas,32), False),
# Reduce memory footprint of default values by creating single instance of common fields
value_map_ant_32_int = numpy.zeros([number_of_antennas, 32], dtype=numpy.int64)
value_map_ant_32_bool = numpy.full((number_of_antennas, 32), False)
value_map_ant_bool = numpy.full(number_of_antennas, False)
self._control_mapping = control_to_recv_mapping
self._power_mapping = power_to_recv_mapping
self._number_of_receivers = number_of_receivers
self._default_value_mapping_read = {
"ANT_mask_RW": value_map_ant_bool,
"RCU_PWR_ANT_on_R": value_map_ant_bool,
"RCU_PWR_ANT_on_RW": value_map_ant_bool,
"HBAT_BF_delay_steps_R": value_map_ant_32_int,
"HBAT_BF_delay_steps_RW": value_map_ant_32_int,
"HBAT_LED_on_R": value_map_ant_32_bool,
"HBAT_LED_on_RW": value_map_ant_32_bool,
"HBAT_PWR_LNA_on_R": value_map_ant_32_bool,
"HBAT_PWR_LNA_on_RW": value_map_ant_32_bool,
"HBAT_PWR_on_R": value_map_ant_32_bool,
"HBAT_PWR_on_RW": value_map_ant_32_bool,
"RCU_band_select_RW": numpy.zeros(number_of_antennas, dtype=numpy.int64)
}
self.__default_value_mapping_write = {
"ANT_mask_RW": numpy.full(96, False),
"RCU_PWR_ANT_on_RW": numpy.full(96, False),
"HBAT_BF_delay_steps_RW": numpy.zeros([96,32], dtype=numpy.int64),
"HBAT_LED_on_RW": numpy.full((96,32), False),
"HBAT_PWR_LNA_on_RW": numpy.full((96,32), False),
"HBAT_PWR_on_RW": numpy.full((96,32), False),
"RCU_band_select_RW": numpy.zeros(96, dtype=numpy.int64)
self._masked_value_mapping_write = {
"ANT_mask_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96,
"RCU_PWR_ANT_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96,
"HBAT_BF_delay_steps_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96_32,
"HBAT_LED_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96_32,
"HBAT_PWR_LNA_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96_32,
"HBAT_PWR_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96_32,
"RCU_band_select_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96,
}
self.__reshape_attributes_in = {
self._reshape_attributes_in = {
"HBAT_BF_delay_steps_RW": (96, 32),
}
self.__reshape_attributes_out = {
self._reshape_attributes_out = {
"HBAT_BF_delay_steps_RW": (96, 32),
}
def map_read(self, mapped_attribute, recv_results):
default_values = self.__default_value_mapping_read[mapped_attribute]
def map_read(self, mapped_attribute: str, recv_results: List[any]):
"""
if mapped_attribute in self.__reshape_attributes_in:
:param mapped_attribute: attribute identifier as present in
py:attribute:`~_default_value_mapping_read`
:param recv_results: Results as gathered by appending attributes from RECV
devices
:return:
"""
default_values = self._default_value_mapping_read[mapped_attribute]
if mapped_attribute in self._reshape_attributes_in:
recv_results = numpy.reshape(recv_results,
(self.__number_of_receivers,) + self.__reshape_attributes_in[mapped_attribute])
(self._number_of_receivers,) + self._reshape_attributes_in[mapped_attribute])
return self._mapped_r_values(recv_results, default_values)
def map_write(self, mapped_attribute, set_values):
default_values = self.__default_value_mapping_write[mapped_attribute]
def map_write(self, mapped_attribute: str, set_values):
"""Perform a mapped write for the attribute using the set_values
:param mapped_attribute: attribute identifier as present in
py:attribute:`~_default_value_mapping_write`
:param set_values:
:return:
"""
default_values = self._masked_value_mapping_write[mapped_attribute]
mapped_values = self._mapped_rw_values(set_values, default_values)
if mapped_attribute in self.__reshape_attributes_out:
if mapped_attribute in self._reshape_attributes_out:
mapped_values = numpy.reshape(mapped_values,
(self.__number_of_receivers,) + self.__reshape_attributes_out[mapped_attribute])
(self._number_of_receivers,) + self._reshape_attributes_out[mapped_attribute])
return mapped_values
def merge_write(self, mapped_values, current_values):
"""Merge values as retrieved from :py:func:`~map_write` with current_values
To be used by the :py:class:`~AntennaField` device to remove None fields
from mapped_values with recently retrieved current_values from RECV device.
:param mapped_values:
:param current_values:
:return:
"""
for idx, value in enumerate(mapped_values):
if value is None:
mapped_values[idx] = current_values[idx]
def _mapped_r_values(self, recv_results, default_values):
mapped_values = numpy.array(default_values)
for idx, mapping in enumerate(self.__control_mapping):
for idx, mapping in enumerate(self._control_mapping):
recv = mapping[0]
rcu = mapping[1]
if recv > 0:
......@@ -523,16 +569,17 @@ class AntennaToRecvMapper(object):
def _mapped_rw_values(self, set_values, default_values):
mapped_values = []
for _ in range(self.__number_of_receivers):
for _ in range(self._number_of_receivers):
mapped_values.append(default_values)
mapped_values = numpy.array(mapped_values)
for idx, mapping in enumerate(self.__control_mapping):
for idx, mapping in enumerate(self._control_mapping):
recv = mapping[0]
rcu = mapping[1]
if recv > 0:
mapped_values[recv - 1, rcu] = set_values[idx]
# import pdb; pdb.set_trace()
return mapped_values
# ----------
......
......@@ -190,50 +190,54 @@ class TestAntennaToRecvMapper(base.TestCase):
# Rename to write
def test_map_write_ant_mask_rw_no_mapping_and_one_receiver(self):
"""Verify results None without control and array sizes"""
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1)
set_values = [False] * 48
expected = [[False] * 96]
set_values = [None] * 48
expected = [[None] * 96]
actual = mapper.map_write("ANT_mask_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_ant_mask_rw_no_mapping_and_two_receivers(self):
"""Verify results None without control and array sizes"""
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2)
set_values = [False] * 48
expected = [[False] * 96] * 2
set_values = [None] * 48
expected = [[None] * 96] * 2
actual = mapper.map_write("ANT_mask_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_ant_mask_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1)
set_values = [True, False] + [False] * 46
expected = [[False, True, False] + [False] * 93]
set_values = [True, False] + [None] * 46
expected = [[False, True] + [None] * 94]
actual = mapper.map_write("ANT_mask_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_rcu_band_select_no_mapping_and_one_receiver(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1)
set_values = [0] * 48
expected = [[0] * 96]
set_values = [None] * 48
expected = [[None] * 96]
actual = mapper.map_write("RCU_band_select_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_rcu_band_select_no_mapping_and_two_receivers(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2)
set_values = [0] * 48
expected = [[0] * 96] * 2
set_values = [None] * 48
expected = [[None] * 96] * 2
actual = mapper.map_write("RCU_band_select_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_rcu_band_select_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1)
set_values = [1, 0] + [0] * 46
expected = [[0, 1, 0] + [0] * 93]
set_values = [1, 0] + [None] * 46
expected = [[0, 1] + [None] * 94]
actual = mapper.map_write("RCU_band_select_RW", set_values)
numpy.testing.assert_equal(expected, actual)
......@@ -241,7 +245,7 @@ class TestAntennaToRecvMapper(base.TestCase):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1)
set_values = [[1] * 32] * 48
expected = [[[0] * 32] * 96]
expected = [[[None] * 32] * 96]
actual = mapper.map_write("HBAT_BF_delay_steps_RW", set_values)
numpy.testing.assert_equal(expected, actual)
......@@ -249,87 +253,87 @@ class TestAntennaToRecvMapper(base.TestCase):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2)
set_values = [[1] * 32] * 48
expected = [[[0] * 32] * 96, [[0] * 32] * 96]
expected = [[[None] * 32] * 96, [[None] * 32] * 96]
actual = mapper.map_write("HBAT_BF_delay_steps_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_bf_delay_steps_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1)
set_values = [[1] * 32, [2] * 32] + [[0] * 32] * 46
expected = [[[2] * 32, [1] * 32] + [[0] * 32] * 94]
set_values = [[1] * 32, [2] * 32] + [[None] * 32] * 46
expected = [[[2] * 32, [1] * 32] + [[None] * 32] * 94]
actual = mapper.map_write("HBAT_BF_delay_steps_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_led_on_rw_no_mapping_and_one_receiver(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1)
set_values = [[False] * 32] * 48
expected = [[[False] * 32] * 96]
set_values = [[None] * 32] * 48
expected = [[[None] * 32] * 96]
actual = mapper.map_write("HBAT_LED_on_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_led_on_rw_no_mapping_and_two_receivers(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2)
set_values = [[False] * 32] * 48
expected = [[[False] * 32] * 96, [[False] * 32] * 96]
set_values = [[None] * 32] * 48
expected = [[[None] * 32] * 96, [[None] * 32] * 96]
actual = mapper.map_write("HBAT_LED_on_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_led_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1)
set_values = [[False, True] * 16, [True, False] * 16] + [[False] * 32] * 46
expected = [[[True, False] * 16, [False, True] * 16] + [[False] * 32] * 94]
set_values = [[False, True] * 16, [True, False] * 16] + [[None] * 32] * 46
expected = [[[True, False] * 16, [False, True] * 16] + [[None] * 32] * 94]
actual = mapper.map_write("HBAT_LED_on_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_pwr_lna_on_rw_no_mapping_and_one_receiver(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1)
set_values = [[False] * 32] * 48
expected = [[[False] * 32] * 96]
set_values = [[None] * 32] * 48
expected = [[[None] * 32] * 96]
actual = mapper.map_write("HBAT_PWR_LNA_on_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_pwr_lna_on_rw_no_mapping_and_two_receivers(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2)
set_values = [[False] * 32] * 48
expected = [[[False] * 32] * 96, [[False] * 32] * 96]
set_values = [[None] * 32] * 48
expected = [[[None] * 32] * 96, [[None] * 32] * 96]
actual = mapper.map_write("HBAT_PWR_LNA_on_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_pwr_lna_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1)
set_values = [[False, True] * 16, [True, False] * 16] + [[False] * 32] * 46
expected = [[[True, False] * 16, [False, True] * 16] + [[False] * 32] * 94]
set_values = [[False, True] * 16, [True, False] * 16] + [[None] * 32] * 46
expected = [[[True, False] * 16, [False, True] * 16] + [[None] * 32] * 94]
actual = mapper.map_write("HBAT_PWR_LNA_on_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_pwr_on_rw_no_mapping_and_one_receiver(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1)
set_values = [[False] * 32] * 48
expected = [[[False] * 32] * 96]
set_values = [[None] * 32] * 48
expected = [[[None] * 32] * 96]
actual = mapper.map_write("HBAT_PWR_on_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_lna_on_rw_no_mapping_and_two_receivers(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2)
set_values = [[False] * 32] * 48
expected = [[[False] * 32] * 96, [[False] * 32] * 96]
set_values = [[None] * 32] * 48
expected = [[[None] * 32] * 96, [[None] * 32] * 96]
actual = mapper.map_write("HBAT_PWR_on_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_pwr_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1)
set_values = [[False, True] * 16, [True, False] * 16] + [[False] * 32] * 46
expected = [[[True, False] * 16, [False, True] * 16] + [[False] * 32] * 94]
set_values = [[False, True] * 16, [True, False] * 16] + [[None] * 32] * 46
expected = [[[True, False] * 16, [False, True] * 16] + [[None] * 32] * 94]
actual = mapper.map_write("HBAT_PWR_on_RW", set_values)
numpy.testing.assert_equal(expected, actual)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment