diff --git a/tangostationcontrol/__init__.py b/tangostationcontrol/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/tangostationcontrol/tangostationcontrol/common/type_checking.py b/tangostationcontrol/tangostationcontrol/common/type_checking.py new file mode 100644 index 0000000000000000000000000000000000000000..e896e708c757da78331e0aa5655f3c8e7a66bfda --- /dev/null +++ b/tangostationcontrol/tangostationcontrol/common/type_checking.py @@ -0,0 +1,23 @@ +# -*- coding: utf-8 -*- +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. + +from collections.abc import Sequence + +import numpy + + +def is_sequence(obj): + """Identify sequences / collections""" + return isinstance(obj, Sequence) or isinstance(obj, numpy.ndarray) + + +def sequence_not_str(obj): + """Separate sequences / collections from str, byte or bytearray""" + return is_sequence(obj) and not isinstance(obj, (str, bytes, bytearray)) + + +def type_not_sequence(obj): + """Separate sequences / collections from types""" + return not is_sequence(obj) and isinstance(obj, type) diff --git a/tangostationcontrol/tangostationcontrol/devices/antennafield.py b/tangostationcontrol/tangostationcontrol/devices/antennafield.py index 565f979539a3a70f538f39f880cd7efeae0102e4..93e1a8c67e4e9cad3babce6acfe2e17a0fd0dc13 100644 --- a/tangostationcontrol/tangostationcontrol/devices/antennafield.py +++ b/tangostationcontrol/tangostationcontrol/devices/antennafield.py @@ -9,12 +9,15 @@ from enum import IntEnum from math import pi import numpy +from typing import List # PyTango imports from tango import DeviceProxy, DevSource, AttrWriteType, DevVarFloatArray, DevVarLongArray from tango.server import device_property, attribute, command # Additional import +from tangostationcontrol.common.type_checking import sequence_not_str +from tangostationcontrol.common.type_checking import type_not_sequence from tangostationcontrol.common.entrypoint import entry from tangostationcontrol.devices.lofar_device import lofar_device from tangostationcontrol.common.lofar_logging import device_logging_to_python, log_exceptions @@ -47,7 +50,10 @@ class mapped_attribute(attribute): if access == AttrWriteType.READ_WRITE: @fault_on_error() def write_func_wrapper(device, value): - write_func = device.set_mapped_attribute(mapping_attribute, value) + cast_type = dtype + while not type_not_sequence(cast_type): + cast_type = cast_type[0] + write_func = device.set_mapped_attribute(mapping_attribute, value, cast_type) self.fset = write_func_wrapper @@ -296,7 +302,7 @@ class AntennaField(lofar_device): antennas_auto_on = numpy.logical_and(use == AntennaUse.AUTO, quality <= AntennaQuality.SUSPICIOUS) return numpy.logical_or(antennas_forced_on, antennas_auto_on) - + def read_nr_antennas_R(self): # The number of antennas should be equal to: # * the number of elements in the Control_to_RECV_mapping (after reshaping), @@ -384,7 +390,7 @@ class AntennaField(lofar_device): power_mapping = numpy.reshape(self.Power_to_RECV_mapping, (-1, 2)) self.__mapper = AntennaToRecvMapper(control_mapping, power_mapping, number_of_receivers) - def get_mapped_attribute(self, mapped_point): + def get_mapped_attribute(self, mapped_point: str): recv_results = [] for recv_proxy in self.recv_proxies: @@ -395,11 +401,24 @@ class AntennaField(lofar_device): return mapped_values - def set_mapped_attribute(self, mapped_point, value): + def set_mapped_attribute(self, mapped_point: str, value, cast_type: type): + """Set the attribute to new value only for controlled points + + :warning: This method is susceptible to a lost update race condition if the + attribute on the RECV device is written to in between `read_attribute` + and `write_attribute`! + + """ + mapped_value = self.__mapper.map_write(mapped_point, value) for idx, recv_proxy in enumerate(self.recv_proxies): - recv_proxy.write_attribute(mapped_point, mapped_value[idx]) + new_values = mapped_value[idx] + + # TODO(Corne): Resolve potential lost update race condition + current_values = recv_proxy.read_attribute(mapped_point).value + self.__mapper.merge_write(new_values, current_values) + recv_proxy.write_attribute(mapped_point, new_values.astype(dtype=cast_type)) # -------- # Overloaded functions @@ -452,67 +471,115 @@ class AntennaField(lofar_device): return result_values.flatten() + class AntennaToRecvMapper(object): + + _VALUE_MAP_NONE_96 = numpy.full(96, None) + _VALUE_MAP_NONE_96_32 = numpy.full((96, 32), None) + def __init__(self, control_to_recv_mapping, power_to_recv_mapping, number_of_receivers): number_of_antennas = len(control_to_recv_mapping) - self.__control_mapping = control_to_recv_mapping - self.__power_mapping = power_to_recv_mapping - self.__number_of_receivers = number_of_receivers - self.__default_value_mapping_read = { - "ANT_mask_RW": numpy.full(number_of_antennas, False), - "RCU_PWR_ANT_on_R": numpy.full(number_of_antennas, False), - "RCU_PWR_ANT_on_RW": numpy.full(number_of_antennas, False), - "HBAT_BF_delay_steps_R": numpy.zeros([number_of_antennas,32], dtype=numpy.int64), - "HBAT_BF_delay_steps_RW": numpy.zeros([number_of_antennas,32], dtype=numpy.int64), - "HBAT_LED_on_R": numpy.full((number_of_antennas,32), False), - "HBAT_LED_on_RW": numpy.full((number_of_antennas,32), False), - "HBAT_PWR_LNA_on_R": numpy.full((number_of_antennas,32), False), - "HBAT_PWR_LNA_on_RW": numpy.full((number_of_antennas,32), False), - "HBAT_PWR_on_R": numpy.full((number_of_antennas,32), False), - "HBAT_PWR_on_RW": numpy.full((number_of_antennas,32), False), + # Reduce memory footprint of default values by creating single instance of + # common fields + value_map_ant_32_int = numpy.zeros([number_of_antennas, 32], dtype=numpy.int64) + value_map_ant_32_bool = numpy.full((number_of_antennas, 32), False) + value_map_ant_bool = numpy.full(number_of_antennas, False) + + self._control_mapping = control_to_recv_mapping + self._power_mapping = power_to_recv_mapping + self._number_of_receivers = number_of_receivers + self._default_value_mapping_read = { + "ANT_mask_RW": value_map_ant_bool, + "RCU_PWR_ANT_on_R": value_map_ant_bool, + "RCU_PWR_ANT_on_RW": value_map_ant_bool, + "HBAT_BF_delay_steps_R": value_map_ant_32_int, + "HBAT_BF_delay_steps_RW": value_map_ant_32_int, + "HBAT_LED_on_R": value_map_ant_32_bool, + "HBAT_LED_on_RW": value_map_ant_32_bool, + "HBAT_PWR_LNA_on_R": value_map_ant_32_bool, + "HBAT_PWR_LNA_on_RW": value_map_ant_32_bool, + "HBAT_PWR_on_R": value_map_ant_32_bool, + "HBAT_PWR_on_RW": value_map_ant_32_bool, "RCU_band_select_RW": numpy.zeros(number_of_antennas, dtype=numpy.int64) } - self.__default_value_mapping_write = { - "ANT_mask_RW": numpy.full(96, False), - "RCU_PWR_ANT_on_RW": numpy.full(96, False), - "HBAT_BF_delay_steps_RW": numpy.zeros([96,32], dtype=numpy.int64), - "HBAT_LED_on_RW": numpy.full((96,32), False), - "HBAT_PWR_LNA_on_RW": numpy.full((96,32), False), - "HBAT_PWR_on_RW": numpy.full((96,32), False), - "RCU_band_select_RW": numpy.zeros(96, dtype=numpy.int64) + self._masked_value_mapping_write = { + "ANT_mask_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96, + "RCU_PWR_ANT_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96, + "HBAT_BF_delay_steps_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96_32, + "HBAT_LED_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96_32, + "HBAT_PWR_LNA_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96_32, + "HBAT_PWR_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96_32, + "RCU_band_select_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96, } - self.__reshape_attributes_in = { + self._reshape_attributes_in = { "HBAT_BF_delay_steps_RW": (96, 32), } - self.__reshape_attributes_out = { + self._reshape_attributes_out = { "HBAT_BF_delay_steps_RW": (96, 32), } - def map_read(self, mapped_attribute, recv_results): - default_values = self.__default_value_mapping_read[mapped_attribute] + def map_read(self, mapped_attribute: str, recv_results: List[any]) -> List[any]: + """Perform a mapped read for the attribute using the recv_results + + :param mapped_attribute: attribute identifier as present in + py:attribute:`~_default_value_mapping_read` + :param recv_results: Results as gathered by appending attributes from RECV + devices + :return: recv_results as mapped given attribute dimensions and control mapping + """ - if mapped_attribute in self.__reshape_attributes_in: + default_values = self._default_value_mapping_read[mapped_attribute] + + if mapped_attribute in self._reshape_attributes_in: recv_results = numpy.reshape(recv_results, - (self.__number_of_receivers,) + self.__reshape_attributes_in[mapped_attribute]) + (self._number_of_receivers,) + self._reshape_attributes_in[mapped_attribute]) return self._mapped_r_values(recv_results, default_values) - def map_write(self, mapped_attribute, set_values): - default_values = self.__default_value_mapping_write[mapped_attribute] + def map_write(self, mapped_attribute: str, set_values: List[any]) -> List[any]: + """Perform a mapped write for the attribute using the set_values + + :param mapped_attribute: attribute identifier as present in + py:attribute:`~_default_value_mapping_write` + :param set_values: The values to be set for the specified attribute + :return: set_values as mapped given attribute dimensions and control mapping + """ + + default_values = self._masked_value_mapping_write[mapped_attribute] mapped_values = self._mapped_rw_values(set_values, default_values) - if mapped_attribute in self.__reshape_attributes_out: + if mapped_attribute in self._reshape_attributes_out: mapped_values = numpy.reshape(mapped_values, - (self.__number_of_receivers,) + self.__reshape_attributes_out[mapped_attribute]) + (self._number_of_receivers,) + self._reshape_attributes_out[mapped_attribute]) return mapped_values - def _mapped_r_values(self, recv_results, default_values): + def merge_write(self, merge_values: List[any], current_values: List[any]): + """Merge values as retrieved from :py:func:`~map_write` with current_values + + This method will modify the contents of merge_values. + + To be used by the :py:class:`~AntennaField` device to remove None fields + from mapped_values with recently retrieved current_values from RECV device. + + :param merge_values: values as retrieved from :py:func:`~map_write` + :param current_values: values retrieved from RECV device on specific attribute + """ + + for idx, value in enumerate(merge_values): + if sequence_not_str(value): + self.merge_write(merge_values[idx], current_values[idx]) + elif value is None: + merge_values[idx] = current_values[idx] + + def _mapped_r_values(self, recv_results: List[any], default_values: List[any]): + """Mapping for read using :py:attribute:`~_control_mapping` and shallow copy""" + mapped_values = numpy.array(default_values) - for idx, mapping in enumerate(self.__control_mapping): + for idx, mapping in enumerate(self._control_mapping): recv = mapping[0] rcu = mapping[1] if recv > 0: @@ -520,14 +587,16 @@ class AntennaToRecvMapper(object): return mapped_values - def _mapped_rw_values(self, set_values, default_values): + def _mapped_rw_values(self, set_values: List[any], default_values: List[any]): + """Mapping for write using :py:attribute:`~_control_mapping` and shallow copy""" + mapped_values = [] - for _ in range(self.__number_of_receivers): + for _ in range(self._number_of_receivers): mapped_values.append(default_values) mapped_values = numpy.array(mapped_values) - for idx, mapping in enumerate(self.__control_mapping): + for idx, mapping in enumerate(self._control_mapping): recv = mapping[0] rcu = mapping[1] if recv > 0: diff --git a/tangostationcontrol/tangostationcontrol/integration_test/README.md b/tangostationcontrol/tangostationcontrol/integration_test/README.md index 972e3a2a9074c2d2fc9994b460a615d37ccc2ca1..d06aa9b504ed46b8bffd711f7ed640729b0d301d 100644 --- a/tangostationcontrol/tangostationcontrol/integration_test/README.md +++ b/tangostationcontrol/tangostationcontrol/integration_test/README.md @@ -23,7 +23,7 @@ These arguments and modules can also be passed at the level of the Makefile instead of through tox directly: ```shell -make integration default import.path.class.functionname` +make integration import.path.class.functionname` ``` ## Breakpoints & Debuggers with Integration Tests diff --git a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_antennafield.py b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_antennafield.py index 2a9038950c072c3cdc4742ed8e6c8f0beb902bda..7d071bc8350f82d9b288fd4cfe45bb7ea0bdf88c 100644 --- a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_antennafield.py +++ b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_antennafield.py @@ -7,20 +7,39 @@ # Distributed under the terms of the APACHE license. # See LICENSE.txt for more info. +from tango._tango import DevState import numpy from tangostationcontrol.integration_test.device_proxy import TestDeviceProxy from tangostationcontrol.devices.antennafield import AntennaQuality, AntennaUse from .base import AbstractTestBases + class TestAntennaFieldDevice(AbstractTestBases.TestDeviceBase): def setUp(self): super().setUp("STAT/AntennaField/1") - self.proxy.put_property({"RECV_devices": ["STAT/RECV/1"], - "Power_to_RECV_mapping": [1, 1, 1, 0] + [-1] * 92}) + self.proxy.put_property({ + "RECV_devices": ["STAT/RECV/1"], + "Power_to_RECV_mapping": [1, 1, 1, 0] + [-1] * 92 + }) self.recv_proxy = self.setup_recv_proxy() + self.addCleanup(self.restore_antennafield) + self.addCleanup(self.shutdown_recv) + + def restore_antennafield(self): + self.proxy.put_property({ + "RECV_devices": ["STAT/RECV/1"], + "Power_to_RECV_mapping": [-1, -1] * 48, + "Control_to_RECV_mapping": [-1, -1] * 48 + }) + + @staticmethod + def shutdown_recv(): + recv_proxy = TestDeviceProxy("STAT/RECV/1") + recv_proxy.off() + def setup_recv_proxy(self): # setup RECV recv_proxy = TestDeviceProxy("STAT/RECV/1") @@ -32,41 +51,225 @@ class TestAntennaFieldDevice(AbstractTestBases.TestDeviceBase): def test_property_recv_devices_has_one_receiver(self): result = self.proxy.get_property("RECV_devices") self.assertSequenceEqual(result["RECV_devices"], ["STAT/RECV/1"]) - + def test_ANT_mask_RW_configured_after_Antenna_Usage_Mask(self): - """ Verify if ANT_mask_RW values are correctly configured from Antenna_Usage_Mask values """ - recv_proxy = self.setup_recv_proxy() + """ Verify if ANT_mask_RW values are correctly configured from Antenna_Usage_Mask values""" + antennafield_proxy = self.proxy - numpy.testing.assert_equal(numpy.array([True] * 96), recv_proxy.ANT_mask_RW) + numpy.testing.assert_equal( + numpy.array([True] * 96), self.recv_proxy.ANT_mask_RW + ) antenna_qualities = numpy.array([AntennaQuality.OK] * 96) antenna_use = numpy.array([AntennaUse.ON] + [AntennaUse.AUTO] * 95) - antenna_properties = {'Antenna_Quality': antenna_qualities, 'Antenna_Use': antenna_use} - mapping_properties = {"RECV_devices": ["STAT/RECV/1"], - "Power_to_RECV_mapping": [-1, -1] * 48, - "Control_to_RECV_mapping": [1, 0 , 1, 1] + [-1, -1] * 46} + antenna_properties = { + 'Antenna_Quality': antenna_qualities, 'Antenna_Use': antenna_use + } + mapping_properties = { + "RECV_devices": ["STAT/RECV/1"], + "Power_to_RECV_mapping": [-1, -1] * 48, + # Two inputs of recv device connected, only defined for 48 inputs + # each pair is one input + "Control_to_RECV_mapping": [1, 0 , 1, 1] + [-1, -1] * 46 + } antennafield_proxy.off() antennafield_proxy.put_property(antenna_properties) antennafield_proxy.put_property(mapping_properties) - antennafield_proxy.boot() # initialise hardware values as well - numpy.testing.assert_equal(numpy.array([True] * 96), antennafield_proxy.Antenna_Usage_Mask_R) - numpy.testing.assert_equal(numpy.array([True] * 2 + [False] * 46), antennafield_proxy.ANT_mask_RW) - numpy.testing.assert_equal(numpy.array([True] * 2 + [False] * 46 + [False] * 48), recv_proxy.ANT_mask_RW) + antennafield_proxy.boot() # initialises hardware values as well + + # Verify all antennas are indicated to work + numpy.testing.assert_equal( + numpy.array([True] * 96), antennafield_proxy.Antenna_Usage_Mask_R + ) + + # Verify only connected inputs + Antenna_Usage_Mask_R are true + # As well as dimensions of ANT_mask_RW must match control mapping + numpy.testing.assert_equal( + numpy.array([True] * 2 + [False] * 46), + antennafield_proxy.ANT_mask_RW + ) + + # Verify recv proxy values unaffected as default for ANT_mask_RW is true + numpy.testing.assert_equal( + numpy.array([True] * 2 + [True] * 94), + self.recv_proxy.ANT_mask_RW + ) def test_ANT_mask_RW_configured_after_Antenna_Usage_Mask_only_one_functioning_antenna(self): - """ Verify if ANT_mask_RW values are correctly configured from Antenna_Usage_Mask values (only second antenna is OK)""" - recv_proxy = self.setup_recv_proxy() + """Verify if ANT_mask_RW values are correctly configured from Antenna_Usage_Mask values (only second antenna is OK)""" + antennafield_proxy = self.proxy + + # Broken antennas except second antenna_qualities = numpy.array([AntennaQuality.BROKEN] + [AntennaQuality.OK] + [AntennaQuality.BROKEN] * 94) antenna_use = numpy.array([AntennaUse.AUTO] * 96) - antenna_properties = {'Antenna_Quality': antenna_qualities, 'Antenna_Use': antenna_use} - mapping_properties = {"RECV_devices": ["STAT/RECV/1"], - "Power_to_RECV_mapping": [-1, -1] * 48, - "Control_to_RECV_mapping": [1, 0 , 1, 1] + [-1, -1] * 46} + antenna_properties = { + 'Antenna_Quality': antenna_qualities, 'Antenna_Use': antenna_use + } + + # Configure control mapping to control all 96 inputs of recv device + mapping_properties = { + "RECV_devices": ["STAT/RECV/1"], + "Power_to_RECV_mapping": [-1, -1] * 48, + "Control_to_RECV_mapping": + # [1, 0, 1, 1, 1, 2, 1, x ... 1, 95] + numpy.array([[1, x] for x in range(0, 96)]).flatten() + } + + # Cycle device and set properties antennafield_proxy.off() antennafield_proxy.put_property(antenna_properties) antennafield_proxy.put_property(mapping_properties) - antennafield_proxy.boot() # initialise hardware values as well - numpy.testing.assert_equal(numpy.array([False] + [True] + [False] * 94), antennafield_proxy.Antenna_Usage_Mask_R) - numpy.testing.assert_equal(numpy.array([False] + [True] + [False] * 46), antennafield_proxy.ANT_mask_RW) - numpy.testing.assert_equal(numpy.array([False] + [True] + [False] * 46 + [False] * 48), recv_proxy.ANT_mask_RW) + antennafield_proxy.boot() # initialises hardware values as well + + # Antenna_Usage_Mask_R should be false except one + numpy.testing.assert_equal( + numpy.array([False] + [True] + [False] * 94), + antennafield_proxy.Antenna_Usage_Mask_R + ) + # device.boot() writes Antenna_Usage_Mask_R to ANT_mask_RW + numpy.testing.assert_equal( + numpy.array([False] + [True] + [False] * 94), + antennafield_proxy.ANT_mask_RW + ) + # ANT_mask_RW on antennafield writes to configured recv devices for all + # mapped inputs + numpy.testing.assert_equal( + numpy.array([False] + [True] + [False] * 94), + self.recv_proxy.ANT_mask_RW + ) + + def test_antennafield_set_mapped_attribute_ignore_all(self): + """Verify RECV device attribute unaffected by antennafield if not mapped""" + + # Connect recv/1 device but no control inputs + mapping_properties = { + "RECV_devices": ["STAT/RECV/1"], + "Power_to_RECV_mapping": [-1, -1] * 48, + "Control_to_RECV_mapping": [-1, -1] * 48 + } + + # Cycle device an put properties + antennafield_proxy = self.proxy + antennafield_proxy.off() + antennafield_proxy.put_property(mapping_properties) + antennafield_proxy.boot() + + # Set HBAT_PWR_on_RW to false on recv device and read results + self.recv_proxy.write_attribute("HBAT_PWR_on_RW", [[False] * 32] * 96) + current_values = self.recv_proxy.read_attribute("HBAT_PWR_on_RW").value + + # write true through antennafield + antennafield_proxy.write_attribute("HBAT_PWR_on_RW", [[True] * 32] * 48) + # Test that original recv values for HBAT_PWR_on_RW match current + numpy.testing.assert_equal( + current_values, + self.recv_proxy.read_attribute("HBAT_PWR_on_RW").value + ) + + # Verify device did not enter FAULT state + self.assertEqual(DevState.ON, antennafield_proxy.state()) + + def test_antennafield_set_mapped_attribute(self): + """Verify RECV device attribute changed by antennafield if mapped inputs""" + + mapping_properties = { + "RECV_devices": ["STAT/RECV/1"], + "Power_to_RECV_mapping": [-1, -1] * 48, + # Each pair is one mapping so 2 inputs are connected + "Control_to_RECV_mapping": [1, 0, 1, 1] + [-1, -1] * 46 + } + + antennafield_proxy = self.proxy + antennafield_proxy.off() + antennafield_proxy.put_property(mapping_properties) + antennafield_proxy.boot() + + self.recv_proxy.write_attribute("HBAT_PWR_on_RW", [[False] * 32] * 96) + + try: + antennafield_proxy.write_attribute( + "HBAT_PWR_on_RW", [[True] * 32] * 48 + ) + numpy.testing.assert_equal( + numpy.array([[True] * 32] * 2 + [[False] * 32] * 94), + self.recv_proxy.read_attribute("HBAT_PWR_on_RW").value + ) + finally: + # Always restore recv again + self.recv_proxy.write_attribute( + "HBAT_PWR_on_RW", [[False] * 32] * 96 + ) + + # Verify device did not enter FAULT state + self.assertEqual(DevState.ON, antennafield_proxy.state()) + + def test_antennafield_set_mapped_attribute_all(self): + """Verify RECV device attribute changed by antennafield all inputs mapped""" + + mapping_properties = { + "RECV_devices": ["STAT/RECV/1"], + "Power_to_RECV_mapping": [-1, -1] * 48, + "Control_to_RECV_mapping": + # [1, 0, 1, 1, 1, 2, 1, x ... 1, 95] + numpy.array([[1, x] for x in range(0, 96)]).flatten() + } + + antennafield_proxy = self.proxy + antennafield_proxy.off() + antennafield_proxy.put_property(mapping_properties) + antennafield_proxy.boot() + + self.recv_proxy.write_attribute("HBAT_PWR_on_RW", [[False] * 32] * 96) + + try: + antennafield_proxy.write_attribute( + "HBAT_PWR_on_RW", [[True] * 32] * 96 + ) + numpy.testing.assert_equal( + numpy.array([[True] * 32] * 96), + self.recv_proxy.read_attribute("HBAT_PWR_on_RW").value + ) + finally: + # Always restore recv again + self.recv_proxy.write_attribute( + "HBAT_PWR_on_RW", [[False] * 32] * 96 + ) + + # Verify device did not enter FAULT state + self.assertEqual(DevState.ON, antennafield_proxy.state()) + + def test_antennafield_set_mapped_attribute_small(self): + """Verify small RECV device attribute changed all inputs mapped""" + + mapping_properties = { + "RECV_devices": ["STAT/RECV/1"], + "Power_to_RECV_mapping": [-1, -1] * 48, + "Control_to_RECV_mapping": + # [1, 0, 1, 1, 1, 2, 1, x ... 1, 95] + numpy.array([[1, x] for x in range(0, 96)]).flatten() + } + + antennafield_proxy = self.proxy + antennafield_proxy.off() + antennafield_proxy.put_property(mapping_properties) + antennafield_proxy.boot() + + self.recv_proxy.write_attribute("RCU_band_select_RW", [False] * 96) + + try: + antennafield_proxy.write_attribute( + "RCU_band_select_RW", [True] * 96 + ) + numpy.testing.assert_equal( + numpy.array([True] * 96), + self.recv_proxy.read_attribute("RCU_band_select_RW").value + ) + finally: + # Always restore recv again + self.recv_proxy.write_attribute( + "RCU_band_select_RW", [False] * 96 + ) + + # Verify device did not enter FAULT state + self.assertEqual(DevState.ON, antennafield_proxy.state()) diff --git a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_digitalbeam.py b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_digitalbeam.py index d4ba9f993a56dd3e6f20c03ef43e06e6ebfb02c4..aed6b32314ddc6a2a69fe3f939a0c7ccaa4b6fda 100644 --- a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_digitalbeam.py +++ b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_digitalbeam.py @@ -63,9 +63,11 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): NR_TILES = 48 antennafield_proxy = TestDeviceProxy(self.antennafield_iden) control_mapping = [[1,i] for i in range(NR_TILES)] - antennafield_proxy.put_property({"RECV_devices": [self.recv_iden], - "HBAT_Control_to_RECV_mapping": numpy.array(control_mapping).flatten(), - 'Antenna_Quality': antenna_qualities, 'Antenna_Use': antenna_use}) + antennafield_proxy.put_property({ + "RECV_devices": [self.recv_iden], + "Control_to_RECV_mapping": numpy.array(control_mapping).flatten(), + 'Antenna_Quality': antenna_qualities, 'Antenna_Use': antenna_use} + ) antennafield_proxy.off() antennafield_proxy.boot() return antennafield_proxy diff --git a/tangostationcontrol/tangostationcontrol/integration_test/recv_cluster/test_recv_cluster.py b/tangostationcontrol/tangostationcontrol/integration_test/recv_cluster/test_recv_cluster.py index fea5e86b8b896cff43f6c38b8122b8ad2b08ac30..e6c77e3998fbcceacce5611b42c91988a55258c0 100644 --- a/tangostationcontrol/tangostationcontrol/integration_test/recv_cluster/test_recv_cluster.py +++ b/tangostationcontrol/tangostationcontrol/integration_test/recv_cluster/test_recv_cluster.py @@ -22,9 +22,9 @@ logger = logging.getLogger() class TestRecvCluster(base.IntegrationTestCase): - + # The AntennaField is setup with self.NR_TILES tiles in the test configuration NR_TILES = 48 - POINTING_DIRECTION = numpy.array([["J2000","0deg","0deg"]] * NR_TILES).flatten() + POINTING_DIRECTION = numpy.array([["J2000", "0deg", "0deg"]] * NR_TILES).flatten() def setUp(self): diff --git a/tangostationcontrol/tangostationcontrol/test/devices/test_antennafield_device.py b/tangostationcontrol/tangostationcontrol/test/devices/test_antennafield_device.py index 2e4b798e52545e6e0e66b176a27da6c90314f4df..85d10cfab14fe784e250694d08afd6b9fbe2b2ac 100644 --- a/tangostationcontrol/tangostationcontrol/test/devices/test_antennafield_device.py +++ b/tangostationcontrol/tangostationcontrol/test/devices/test_antennafield_device.py @@ -7,6 +7,13 @@ # Distributed under the terms of the APACHE license. # See LICENSE.txt for more info. +import time +import statistics +import logging + +import unittest +from unittest import mock + import numpy from tango.test_context import DeviceTestContext @@ -16,6 +23,8 @@ from tangostationcontrol.devices.antennafield import AntennaToRecvMapper, Antenn from tangostationcontrol.test import base from tangostationcontrol.test.devices import device_base +logger = logging.getLogger() + class TestAntennaToRecvMapper(base.TestCase): @@ -190,50 +199,54 @@ class TestAntennaToRecvMapper(base.TestCase): # Rename to write def test_map_write_ant_mask_rw_no_mapping_and_one_receiver(self): + """Verify results None without control and array sizes""" + mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1) - set_values = [False] * 48 - expected = [[False] * 96] + set_values = [None] * 48 + expected = [[None] * 96] actual = mapper.map_write("ANT_mask_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_ant_mask_rw_no_mapping_and_two_receivers(self): + """Verify results None without control and array sizes""" + mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2) - set_values = [False] * 48 - expected = [[False] * 96] * 2 + set_values = [None] * 48 + expected = [[None] * 96] * 2 actual = mapper.map_write("ANT_mask_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_ant_mask_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1) - set_values = [True, False] + [False] * 46 - expected = [[False, True, False] + [False] * 93] + set_values = [True, False] + [None] * 46 + expected = [[False, True] + [None] * 94] actual = mapper.map_write("ANT_mask_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_rcu_band_select_no_mapping_and_one_receiver(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1) - set_values = [0] * 48 - expected = [[0] * 96] + set_values = [None] * 48 + expected = [[None] * 96] actual = mapper.map_write("RCU_band_select_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_rcu_band_select_no_mapping_and_two_receivers(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2) - set_values = [0] * 48 - expected = [[0] * 96] * 2 + set_values = [None] * 48 + expected = [[None] * 96] * 2 actual = mapper.map_write("RCU_band_select_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_rcu_band_select_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1) - set_values = [1, 0] + [0] * 46 - expected = [[0, 1, 0] + [0] * 93] + set_values = [1, 0] + [None] * 46 + expected = [[0, 1] + [None] * 94] actual = mapper.map_write("RCU_band_select_RW", set_values) numpy.testing.assert_equal(expected, actual) @@ -241,7 +254,7 @@ class TestAntennaToRecvMapper(base.TestCase): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1) set_values = [[1] * 32] * 48 - expected = [[[0] * 32] * 96] + expected = [[[None] * 32] * 96] actual = mapper.map_write("HBAT_BF_delay_steps_RW", set_values) numpy.testing.assert_equal(expected, actual) @@ -249,104 +262,138 @@ class TestAntennaToRecvMapper(base.TestCase): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2) set_values = [[1] * 32] * 48 - expected = [[[0] * 32] * 96, [[0] * 32] * 96] + expected = [[[None] * 32] * 96, [[None] * 32] * 96] actual = mapper.map_write("HBAT_BF_delay_steps_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_bf_delay_steps_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1) - set_values = [[1] * 32, [2] * 32] + [[0] * 32] * 46 - expected = [[[2] * 32, [1] * 32] + [[0] * 32] * 94] + set_values = [[1] * 32, [2] * 32] + [[None] * 32] * 46 + expected = [[[2] * 32, [1] * 32] + [[None] * 32] * 94] actual = mapper.map_write("HBAT_BF_delay_steps_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_led_on_rw_no_mapping_and_one_receiver(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1) - set_values = [[False] * 32] * 48 - expected = [[[False] * 32] * 96] + set_values = [[None] * 32] * 48 + expected = [[[None] * 32] * 96] actual = mapper.map_write("HBAT_LED_on_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_led_on_rw_no_mapping_and_two_receivers(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2) - set_values = [[False] * 32] * 48 - expected = [[[False] * 32] * 96, [[False] * 32] * 96] + set_values = [[None] * 32] * 48 + expected = [[[None] * 32] * 96, [[None] * 32] * 96] actual = mapper.map_write("HBAT_LED_on_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_led_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1) - set_values = [[False, True] * 16, [True, False] * 16] + [[False] * 32] * 46 - expected = [[[True, False] * 16, [False, True] * 16] + [[False] * 32] * 94] + set_values = [[False, True] * 16, [True, False] * 16] + [[None] * 32] * 46 + expected = [[[True, False] * 16, [False, True] * 16] + [[None] * 32] * 94] actual = mapper.map_write("HBAT_LED_on_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_pwr_lna_on_rw_no_mapping_and_one_receiver(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1) - set_values = [[False] * 32] * 48 - expected = [[[False] * 32] * 96] + set_values = [[None] * 32] * 48 + expected = [[[None] * 32] * 96] actual = mapper.map_write("HBAT_PWR_LNA_on_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_pwr_lna_on_rw_no_mapping_and_two_receivers(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2) - set_values = [[False] * 32] * 48 - expected = [[[False] * 32] * 96, [[False] * 32] * 96] + set_values = [[None] * 32] * 48 + expected = [[[None] * 32] * 96, [[None] * 32] * 96] actual = mapper.map_write("HBAT_PWR_LNA_on_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_pwr_lna_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1) - set_values = [[False, True] * 16, [True, False] * 16] + [[False] * 32] * 46 - expected = [[[True, False] * 16, [False, True] * 16] + [[False] * 32] * 94] + set_values = [[False, True] * 16, [True, False] * 16] + [[None] * 32] * 46 + expected = [[[True, False] * 16, [False, True] * 16] + [[None] * 32] * 94] actual = mapper.map_write("HBAT_PWR_LNA_on_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_pwr_on_rw_no_mapping_and_one_receiver(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1) - - set_values = [[False] * 32] * 48 - expected = [[[False] * 32] * 96] + set_values = [[None] * 32] * 48 + expected = [[[None] * 32] * 96] actual = mapper.map_write("HBAT_PWR_on_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_lna_on_rw_no_mapping_and_two_receivers(self): mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2) - set_values = [[False] * 32] * 48 - expected = [[[False] * 32] * 96, [[False] * 32] * 96] + set_values = [[None] * 32] * 48 + expected = [[[None] * 32] * 96, [[None] * 32] * 96] actual = mapper.map_write("HBAT_PWR_on_RW", set_values) numpy.testing.assert_equal(expected, actual) def test_map_write_pwr_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self): mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1) - set_values = [[False, True] * 16, [True, False] * 16] + [[False] * 32] * 46 - expected = [[[True, False] * 16, [False, True] * 16] + [[False] * 32] * 94] + set_values = [[False, True] * 16, [True, False] * 16] + [[None] * 32] * 46 + expected = [[[True, False] * 16, [False, True] * 16] + [[None] * 32] * 94] actual = mapper.map_write("HBAT_PWR_on_RW", set_values) numpy.testing.assert_equal(expected, actual) + def test_merge_write(self): + """Verify all None fields are replaced by merge_write if no control""" + + mapper = AntennaToRecvMapper( + self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1 + ) + + merge_values = [[None] * 32] * 96 + current_values = [[False] * 32] * 96 + + mapper.merge_write(merge_values, current_values) + numpy.testing.assert_equal(merge_values, current_values) + + results = [] + for _i in range(25): + start_time = time.monotonic_ns() + mapper.merge_write(merge_values, current_values) + stop_time = time.monotonic_ns() + results.append(stop_time - start_time) + + logging.error( + f"Merge write performance: Median {statistics.median(results) / 1.e9} " + f"Stdev {statistics.stdev(results) / 1.e9}" + ) + + def test_merge_write_values(self): + """Verify all fields with values are retained by merge_write""" + + mapper = AntennaToRecvMapper( + self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1 + ) + + merge_values = [[True] * 32] * 2 + [[None] * 32] * 94 + current_values = [[True] * 32] * 2 + [[False] * 32] * 94 + + mapper.merge_write(merge_values, current_values) + numpy.testing.assert_equal(merge_values, current_values) + class TestAntennafieldDevice(device_base.DeviceTestCase): # some dummy values for mandatory properties - AT_PROPERTIES = {'OPC_Server_Name': 'example.com', 'OPC_Server_Port': 4840, 'OPC_Time_Out': 5.0, - 'Antenna_Field_Reference_ITRF' : [3.0, 3.0, 3.0], 'Antenna_Field_Reference_ETRS' : [7.0, 7.0, 7.0]} - - # A mapping where Antennas are all not mapped to power RCUs - POWER_NOT_CONNECTED = [[-1, -1]] * 48 - # A mapping where Antennas are all not mapped to control RCUs - CONTROL_NOT_CONNECTED = [[-1, -1]] * 48 - # A mapping where first two Antennas are mapped on the first Receiver. - # The first Antenna control line on RCU 1 and the second Antenna control line on RCU 0. - CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1 = [[1, 1], [1, 0]] + [[-1, -1]] * 46 + AT_PROPERTIES = { + 'OPC_Server_Name': 'example.com', + 'OPC_Server_Port': 4840, + 'OPC_Time_Out': 5.0, + 'Antenna_Field_Reference_ITRF': [3.0, 3.0, 3.0], + 'Antenna_Field_Reference_ETRS': [7.0, 7.0, 7.0], + } def setUp(self): # DeviceTestCase setUp patches lofar_device DeviceProxy @@ -397,3 +444,33 @@ class TestAntennafieldDevice(device_base.DeviceTestCase): with DeviceTestContext(antennafield.AntennaField, properties={**self.AT_PROPERTIES, **antenna_properties}, process=True) as proxy: for i in range(len(antenna_names)): self.assertTrue(proxy.Antenna_Names_R[i]==f"C{i}") + + @unittest.skip("Test for manual use, enable at most one (process=false)") + @mock.patch.object(antennafield, "DeviceProxy") + def test_set_mapped_attribute(self, m_proxy): + """Verify set_mapped_attribute only modifies controlled inputs""" + + antenna_properties = { + 'RECV_devices': ['stat/RECV/1'], + } + + data = numpy.array([[False] * 32] * 96) + + m_proxy.return_value = mock.Mock( + read_attribute=mock.Mock( + return_value=mock.Mock(value=data) + ) + ) + + with DeviceTestContext( + antennafield.AntennaField, process=False, + properties={**self.AT_PROPERTIES, **antenna_properties} + ) as proxy: + proxy.boot() + + proxy.write_attribute("HBAT_PWR_on_RW", numpy.array([[False] * 32] * 48)) + + numpy.testing.assert_equal( + m_proxy.return_value.write_attribute.call_args[0][1], + data + )