Skip to content
Snippets Groups Projects
Commit 068ddf5f authored by Corné Lukken's avatar Corné Lukken
Browse files

Merge branch 'L2SS-876-antennafield-defaults' into 'master'

L2SS-876: Get current RECV values before updating through antennafield

Closes L2SS-876

See merge request !408
parents 7cec2435 c91be878
No related branches found
No related tags found
1 merge request!408L2SS-876: Get current RECV values before updating through antennafield
# -*- coding: utf-8 -*-
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
from collections.abc import Sequence
import numpy
def is_sequence(obj):
"""Identify sequences / collections"""
return isinstance(obj, Sequence) or isinstance(obj, numpy.ndarray)
def sequence_not_str(obj):
"""Separate sequences / collections from str, byte or bytearray"""
return is_sequence(obj) and not isinstance(obj, (str, bytes, bytearray))
def type_not_sequence(obj):
"""Separate sequences / collections from types"""
return not is_sequence(obj) and isinstance(obj, type)
......@@ -9,12 +9,15 @@
from enum import IntEnum
from math import pi
import numpy
from typing import List
# PyTango imports
from tango import DeviceProxy, DevSource, AttrWriteType, DevVarFloatArray, DevVarLongArray
from tango.server import device_property, attribute, command
# Additional import
from tangostationcontrol.common.type_checking import sequence_not_str
from tangostationcontrol.common.type_checking import type_not_sequence
from tangostationcontrol.common.entrypoint import entry
from tangostationcontrol.devices.lofar_device import lofar_device
from tangostationcontrol.common.lofar_logging import device_logging_to_python, log_exceptions
......@@ -47,7 +50,10 @@ class mapped_attribute(attribute):
if access == AttrWriteType.READ_WRITE:
@fault_on_error()
def write_func_wrapper(device, value):
write_func = device.set_mapped_attribute(mapping_attribute, value)
cast_type = dtype
while not type_not_sequence(cast_type):
cast_type = cast_type[0]
write_func = device.set_mapped_attribute(mapping_attribute, value, cast_type)
self.fset = write_func_wrapper
......@@ -296,7 +302,7 @@ class AntennaField(lofar_device):
antennas_auto_on = numpy.logical_and(use == AntennaUse.AUTO, quality <= AntennaQuality.SUSPICIOUS)
return numpy.logical_or(antennas_forced_on, antennas_auto_on)
def read_nr_antennas_R(self):
# The number of antennas should be equal to:
# * the number of elements in the Control_to_RECV_mapping (after reshaping),
......@@ -384,7 +390,7 @@ class AntennaField(lofar_device):
power_mapping = numpy.reshape(self.Power_to_RECV_mapping, (-1, 2))
self.__mapper = AntennaToRecvMapper(control_mapping, power_mapping, number_of_receivers)
def get_mapped_attribute(self, mapped_point):
def get_mapped_attribute(self, mapped_point: str):
recv_results = []
for recv_proxy in self.recv_proxies:
......@@ -395,11 +401,24 @@ class AntennaField(lofar_device):
return mapped_values
def set_mapped_attribute(self, mapped_point, value):
def set_mapped_attribute(self, mapped_point: str, value, cast_type: type):
"""Set the attribute to new value only for controlled points
:warning: This method is susceptible to a lost update race condition if the
attribute on the RECV device is written to in between `read_attribute`
and `write_attribute`!
"""
mapped_value = self.__mapper.map_write(mapped_point, value)
for idx, recv_proxy in enumerate(self.recv_proxies):
recv_proxy.write_attribute(mapped_point, mapped_value[idx])
new_values = mapped_value[idx]
# TODO(Corne): Resolve potential lost update race condition
current_values = recv_proxy.read_attribute(mapped_point).value
self.__mapper.merge_write(new_values, current_values)
recv_proxy.write_attribute(mapped_point, new_values.astype(dtype=cast_type))
# --------
# Overloaded functions
......@@ -452,67 +471,115 @@ class AntennaField(lofar_device):
return result_values.flatten()
class AntennaToRecvMapper(object):
_VALUE_MAP_NONE_96 = numpy.full(96, None)
_VALUE_MAP_NONE_96_32 = numpy.full((96, 32), None)
def __init__(self, control_to_recv_mapping, power_to_recv_mapping, number_of_receivers):
number_of_antennas = len(control_to_recv_mapping)
self.__control_mapping = control_to_recv_mapping
self.__power_mapping = power_to_recv_mapping
self.__number_of_receivers = number_of_receivers
self.__default_value_mapping_read = {
"ANT_mask_RW": numpy.full(number_of_antennas, False),
"RCU_PWR_ANT_on_R": numpy.full(number_of_antennas, False),
"RCU_PWR_ANT_on_RW": numpy.full(number_of_antennas, False),
"HBAT_BF_delay_steps_R": numpy.zeros([number_of_antennas,32], dtype=numpy.int64),
"HBAT_BF_delay_steps_RW": numpy.zeros([number_of_antennas,32], dtype=numpy.int64),
"HBAT_LED_on_R": numpy.full((number_of_antennas,32), False),
"HBAT_LED_on_RW": numpy.full((number_of_antennas,32), False),
"HBAT_PWR_LNA_on_R": numpy.full((number_of_antennas,32), False),
"HBAT_PWR_LNA_on_RW": numpy.full((number_of_antennas,32), False),
"HBAT_PWR_on_R": numpy.full((number_of_antennas,32), False),
"HBAT_PWR_on_RW": numpy.full((number_of_antennas,32), False),
# Reduce memory footprint of default values by creating single instance of
# common fields
value_map_ant_32_int = numpy.zeros([number_of_antennas, 32], dtype=numpy.int64)
value_map_ant_32_bool = numpy.full((number_of_antennas, 32), False)
value_map_ant_bool = numpy.full(number_of_antennas, False)
self._control_mapping = control_to_recv_mapping
self._power_mapping = power_to_recv_mapping
self._number_of_receivers = number_of_receivers
self._default_value_mapping_read = {
"ANT_mask_RW": value_map_ant_bool,
"RCU_PWR_ANT_on_R": value_map_ant_bool,
"RCU_PWR_ANT_on_RW": value_map_ant_bool,
"HBAT_BF_delay_steps_R": value_map_ant_32_int,
"HBAT_BF_delay_steps_RW": value_map_ant_32_int,
"HBAT_LED_on_R": value_map_ant_32_bool,
"HBAT_LED_on_RW": value_map_ant_32_bool,
"HBAT_PWR_LNA_on_R": value_map_ant_32_bool,
"HBAT_PWR_LNA_on_RW": value_map_ant_32_bool,
"HBAT_PWR_on_R": value_map_ant_32_bool,
"HBAT_PWR_on_RW": value_map_ant_32_bool,
"RCU_band_select_RW": numpy.zeros(number_of_antennas, dtype=numpy.int64)
}
self.__default_value_mapping_write = {
"ANT_mask_RW": numpy.full(96, False),
"RCU_PWR_ANT_on_RW": numpy.full(96, False),
"HBAT_BF_delay_steps_RW": numpy.zeros([96,32], dtype=numpy.int64),
"HBAT_LED_on_RW": numpy.full((96,32), False),
"HBAT_PWR_LNA_on_RW": numpy.full((96,32), False),
"HBAT_PWR_on_RW": numpy.full((96,32), False),
"RCU_band_select_RW": numpy.zeros(96, dtype=numpy.int64)
self._masked_value_mapping_write = {
"ANT_mask_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96,
"RCU_PWR_ANT_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96,
"HBAT_BF_delay_steps_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96_32,
"HBAT_LED_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96_32,
"HBAT_PWR_LNA_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96_32,
"HBAT_PWR_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96_32,
"RCU_band_select_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96,
}
self.__reshape_attributes_in = {
self._reshape_attributes_in = {
"HBAT_BF_delay_steps_RW": (96, 32),
}
self.__reshape_attributes_out = {
self._reshape_attributes_out = {
"HBAT_BF_delay_steps_RW": (96, 32),
}
def map_read(self, mapped_attribute, recv_results):
default_values = self.__default_value_mapping_read[mapped_attribute]
def map_read(self, mapped_attribute: str, recv_results: List[any]) -> List[any]:
"""Perform a mapped read for the attribute using the recv_results
:param mapped_attribute: attribute identifier as present in
py:attribute:`~_default_value_mapping_read`
:param recv_results: Results as gathered by appending attributes from RECV
devices
:return: recv_results as mapped given attribute dimensions and control mapping
"""
if mapped_attribute in self.__reshape_attributes_in:
default_values = self._default_value_mapping_read[mapped_attribute]
if mapped_attribute in self._reshape_attributes_in:
recv_results = numpy.reshape(recv_results,
(self.__number_of_receivers,) + self.__reshape_attributes_in[mapped_attribute])
(self._number_of_receivers,) + self._reshape_attributes_in[mapped_attribute])
return self._mapped_r_values(recv_results, default_values)
def map_write(self, mapped_attribute, set_values):
default_values = self.__default_value_mapping_write[mapped_attribute]
def map_write(self, mapped_attribute: str, set_values: List[any]) -> List[any]:
"""Perform a mapped write for the attribute using the set_values
:param mapped_attribute: attribute identifier as present in
py:attribute:`~_default_value_mapping_write`
:param set_values: The values to be set for the specified attribute
:return: set_values as mapped given attribute dimensions and control mapping
"""
default_values = self._masked_value_mapping_write[mapped_attribute]
mapped_values = self._mapped_rw_values(set_values, default_values)
if mapped_attribute in self.__reshape_attributes_out:
if mapped_attribute in self._reshape_attributes_out:
mapped_values = numpy.reshape(mapped_values,
(self.__number_of_receivers,) + self.__reshape_attributes_out[mapped_attribute])
(self._number_of_receivers,) + self._reshape_attributes_out[mapped_attribute])
return mapped_values
def _mapped_r_values(self, recv_results, default_values):
def merge_write(self, merge_values: List[any], current_values: List[any]):
"""Merge values as retrieved from :py:func:`~map_write` with current_values
This method will modify the contents of merge_values.
To be used by the :py:class:`~AntennaField` device to remove None fields
from mapped_values with recently retrieved current_values from RECV device.
:param merge_values: values as retrieved from :py:func:`~map_write`
:param current_values: values retrieved from RECV device on specific attribute
"""
for idx, value in enumerate(merge_values):
if sequence_not_str(value):
self.merge_write(merge_values[idx], current_values[idx])
elif value is None:
merge_values[idx] = current_values[idx]
def _mapped_r_values(self, recv_results: List[any], default_values: List[any]):
"""Mapping for read using :py:attribute:`~_control_mapping` and shallow copy"""
mapped_values = numpy.array(default_values)
for idx, mapping in enumerate(self.__control_mapping):
for idx, mapping in enumerate(self._control_mapping):
recv = mapping[0]
rcu = mapping[1]
if recv > 0:
......@@ -520,14 +587,16 @@ class AntennaToRecvMapper(object):
return mapped_values
def _mapped_rw_values(self, set_values, default_values):
def _mapped_rw_values(self, set_values: List[any], default_values: List[any]):
"""Mapping for write using :py:attribute:`~_control_mapping` and shallow copy"""
mapped_values = []
for _ in range(self.__number_of_receivers):
for _ in range(self._number_of_receivers):
mapped_values.append(default_values)
mapped_values = numpy.array(mapped_values)
for idx, mapping in enumerate(self.__control_mapping):
for idx, mapping in enumerate(self._control_mapping):
recv = mapping[0]
rcu = mapping[1]
if recv > 0:
......
......@@ -23,7 +23,7 @@ These arguments and modules can also be passed at the level of the Makefile
instead of through tox directly:
```shell
make integration default import.path.class.functionname`
make integration import.path.class.functionname`
```
## Breakpoints & Debuggers with Integration Tests
......
......@@ -7,20 +7,39 @@
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
from tango._tango import DevState
import numpy
from tangostationcontrol.integration_test.device_proxy import TestDeviceProxy
from tangostationcontrol.devices.antennafield import AntennaQuality, AntennaUse
from .base import AbstractTestBases
class TestAntennaFieldDevice(AbstractTestBases.TestDeviceBase):
def setUp(self):
super().setUp("STAT/AntennaField/1")
self.proxy.put_property({"RECV_devices": ["STAT/RECV/1"],
"Power_to_RECV_mapping": [1, 1, 1, 0] + [-1] * 92})
self.proxy.put_property({
"RECV_devices": ["STAT/RECV/1"],
"Power_to_RECV_mapping": [1, 1, 1, 0] + [-1] * 92
})
self.recv_proxy = self.setup_recv_proxy()
self.addCleanup(self.restore_antennafield)
self.addCleanup(self.shutdown_recv)
def restore_antennafield(self):
self.proxy.put_property({
"RECV_devices": ["STAT/RECV/1"],
"Power_to_RECV_mapping": [-1, -1] * 48,
"Control_to_RECV_mapping": [-1, -1] * 48
})
@staticmethod
def shutdown_recv():
recv_proxy = TestDeviceProxy("STAT/RECV/1")
recv_proxy.off()
def setup_recv_proxy(self):
# setup RECV
recv_proxy = TestDeviceProxy("STAT/RECV/1")
......@@ -32,41 +51,225 @@ class TestAntennaFieldDevice(AbstractTestBases.TestDeviceBase):
def test_property_recv_devices_has_one_receiver(self):
result = self.proxy.get_property("RECV_devices")
self.assertSequenceEqual(result["RECV_devices"], ["STAT/RECV/1"])
def test_ANT_mask_RW_configured_after_Antenna_Usage_Mask(self):
""" Verify if ANT_mask_RW values are correctly configured from Antenna_Usage_Mask values """
recv_proxy = self.setup_recv_proxy()
""" Verify if ANT_mask_RW values are correctly configured from Antenna_Usage_Mask values"""
antennafield_proxy = self.proxy
numpy.testing.assert_equal(numpy.array([True] * 96), recv_proxy.ANT_mask_RW)
numpy.testing.assert_equal(
numpy.array([True] * 96), self.recv_proxy.ANT_mask_RW
)
antenna_qualities = numpy.array([AntennaQuality.OK] * 96)
antenna_use = numpy.array([AntennaUse.ON] + [AntennaUse.AUTO] * 95)
antenna_properties = {'Antenna_Quality': antenna_qualities, 'Antenna_Use': antenna_use}
mapping_properties = {"RECV_devices": ["STAT/RECV/1"],
"Power_to_RECV_mapping": [-1, -1] * 48,
"Control_to_RECV_mapping": [1, 0 , 1, 1] + [-1, -1] * 46}
antenna_properties = {
'Antenna_Quality': antenna_qualities, 'Antenna_Use': antenna_use
}
mapping_properties = {
"RECV_devices": ["STAT/RECV/1"],
"Power_to_RECV_mapping": [-1, -1] * 48,
# Two inputs of recv device connected, only defined for 48 inputs
# each pair is one input
"Control_to_RECV_mapping": [1, 0 , 1, 1] + [-1, -1] * 46
}
antennafield_proxy.off()
antennafield_proxy.put_property(antenna_properties)
antennafield_proxy.put_property(mapping_properties)
antennafield_proxy.boot() # initialise hardware values as well
numpy.testing.assert_equal(numpy.array([True] * 96), antennafield_proxy.Antenna_Usage_Mask_R)
numpy.testing.assert_equal(numpy.array([True] * 2 + [False] * 46), antennafield_proxy.ANT_mask_RW)
numpy.testing.assert_equal(numpy.array([True] * 2 + [False] * 46 + [False] * 48), recv_proxy.ANT_mask_RW)
antennafield_proxy.boot() # initialises hardware values as well
# Verify all antennas are indicated to work
numpy.testing.assert_equal(
numpy.array([True] * 96), antennafield_proxy.Antenna_Usage_Mask_R
)
# Verify only connected inputs + Antenna_Usage_Mask_R are true
# As well as dimensions of ANT_mask_RW must match control mapping
numpy.testing.assert_equal(
numpy.array([True] * 2 + [False] * 46),
antennafield_proxy.ANT_mask_RW
)
# Verify recv proxy values unaffected as default for ANT_mask_RW is true
numpy.testing.assert_equal(
numpy.array([True] * 2 + [True] * 94),
self.recv_proxy.ANT_mask_RW
)
def test_ANT_mask_RW_configured_after_Antenna_Usage_Mask_only_one_functioning_antenna(self):
""" Verify if ANT_mask_RW values are correctly configured from Antenna_Usage_Mask values (only second antenna is OK)"""
recv_proxy = self.setup_recv_proxy()
"""Verify if ANT_mask_RW values are correctly configured from Antenna_Usage_Mask values (only second antenna is OK)"""
antennafield_proxy = self.proxy
# Broken antennas except second
antenna_qualities = numpy.array([AntennaQuality.BROKEN] + [AntennaQuality.OK] + [AntennaQuality.BROKEN] * 94)
antenna_use = numpy.array([AntennaUse.AUTO] * 96)
antenna_properties = {'Antenna_Quality': antenna_qualities, 'Antenna_Use': antenna_use}
mapping_properties = {"RECV_devices": ["STAT/RECV/1"],
"Power_to_RECV_mapping": [-1, -1] * 48,
"Control_to_RECV_mapping": [1, 0 , 1, 1] + [-1, -1] * 46}
antenna_properties = {
'Antenna_Quality': antenna_qualities, 'Antenna_Use': antenna_use
}
# Configure control mapping to control all 96 inputs of recv device
mapping_properties = {
"RECV_devices": ["STAT/RECV/1"],
"Power_to_RECV_mapping": [-1, -1] * 48,
"Control_to_RECV_mapping":
# [1, 0, 1, 1, 1, 2, 1, x ... 1, 95]
numpy.array([[1, x] for x in range(0, 96)]).flatten()
}
# Cycle device and set properties
antennafield_proxy.off()
antennafield_proxy.put_property(antenna_properties)
antennafield_proxy.put_property(mapping_properties)
antennafield_proxy.boot() # initialise hardware values as well
numpy.testing.assert_equal(numpy.array([False] + [True] + [False] * 94), antennafield_proxy.Antenna_Usage_Mask_R)
numpy.testing.assert_equal(numpy.array([False] + [True] + [False] * 46), antennafield_proxy.ANT_mask_RW)
numpy.testing.assert_equal(numpy.array([False] + [True] + [False] * 46 + [False] * 48), recv_proxy.ANT_mask_RW)
antennafield_proxy.boot() # initialises hardware values as well
# Antenna_Usage_Mask_R should be false except one
numpy.testing.assert_equal(
numpy.array([False] + [True] + [False] * 94),
antennafield_proxy.Antenna_Usage_Mask_R
)
# device.boot() writes Antenna_Usage_Mask_R to ANT_mask_RW
numpy.testing.assert_equal(
numpy.array([False] + [True] + [False] * 94),
antennafield_proxy.ANT_mask_RW
)
# ANT_mask_RW on antennafield writes to configured recv devices for all
# mapped inputs
numpy.testing.assert_equal(
numpy.array([False] + [True] + [False] * 94),
self.recv_proxy.ANT_mask_RW
)
def test_antennafield_set_mapped_attribute_ignore_all(self):
"""Verify RECV device attribute unaffected by antennafield if not mapped"""
# Connect recv/1 device but no control inputs
mapping_properties = {
"RECV_devices": ["STAT/RECV/1"],
"Power_to_RECV_mapping": [-1, -1] * 48,
"Control_to_RECV_mapping": [-1, -1] * 48
}
# Cycle device an put properties
antennafield_proxy = self.proxy
antennafield_proxy.off()
antennafield_proxy.put_property(mapping_properties)
antennafield_proxy.boot()
# Set HBAT_PWR_on_RW to false on recv device and read results
self.recv_proxy.write_attribute("HBAT_PWR_on_RW", [[False] * 32] * 96)
current_values = self.recv_proxy.read_attribute("HBAT_PWR_on_RW").value
# write true through antennafield
antennafield_proxy.write_attribute("HBAT_PWR_on_RW", [[True] * 32] * 48)
# Test that original recv values for HBAT_PWR_on_RW match current
numpy.testing.assert_equal(
current_values,
self.recv_proxy.read_attribute("HBAT_PWR_on_RW").value
)
# Verify device did not enter FAULT state
self.assertEqual(DevState.ON, antennafield_proxy.state())
def test_antennafield_set_mapped_attribute(self):
"""Verify RECV device attribute changed by antennafield if mapped inputs"""
mapping_properties = {
"RECV_devices": ["STAT/RECV/1"],
"Power_to_RECV_mapping": [-1, -1] * 48,
# Each pair is one mapping so 2 inputs are connected
"Control_to_RECV_mapping": [1, 0, 1, 1] + [-1, -1] * 46
}
antennafield_proxy = self.proxy
antennafield_proxy.off()
antennafield_proxy.put_property(mapping_properties)
antennafield_proxy.boot()
self.recv_proxy.write_attribute("HBAT_PWR_on_RW", [[False] * 32] * 96)
try:
antennafield_proxy.write_attribute(
"HBAT_PWR_on_RW", [[True] * 32] * 48
)
numpy.testing.assert_equal(
numpy.array([[True] * 32] * 2 + [[False] * 32] * 94),
self.recv_proxy.read_attribute("HBAT_PWR_on_RW").value
)
finally:
# Always restore recv again
self.recv_proxy.write_attribute(
"HBAT_PWR_on_RW", [[False] * 32] * 96
)
# Verify device did not enter FAULT state
self.assertEqual(DevState.ON, antennafield_proxy.state())
def test_antennafield_set_mapped_attribute_all(self):
"""Verify RECV device attribute changed by antennafield all inputs mapped"""
mapping_properties = {
"RECV_devices": ["STAT/RECV/1"],
"Power_to_RECV_mapping": [-1, -1] * 48,
"Control_to_RECV_mapping":
# [1, 0, 1, 1, 1, 2, 1, x ... 1, 95]
numpy.array([[1, x] for x in range(0, 96)]).flatten()
}
antennafield_proxy = self.proxy
antennafield_proxy.off()
antennafield_proxy.put_property(mapping_properties)
antennafield_proxy.boot()
self.recv_proxy.write_attribute("HBAT_PWR_on_RW", [[False] * 32] * 96)
try:
antennafield_proxy.write_attribute(
"HBAT_PWR_on_RW", [[True] * 32] * 96
)
numpy.testing.assert_equal(
numpy.array([[True] * 32] * 96),
self.recv_proxy.read_attribute("HBAT_PWR_on_RW").value
)
finally:
# Always restore recv again
self.recv_proxy.write_attribute(
"HBAT_PWR_on_RW", [[False] * 32] * 96
)
# Verify device did not enter FAULT state
self.assertEqual(DevState.ON, antennafield_proxy.state())
def test_antennafield_set_mapped_attribute_small(self):
"""Verify small RECV device attribute changed all inputs mapped"""
mapping_properties = {
"RECV_devices": ["STAT/RECV/1"],
"Power_to_RECV_mapping": [-1, -1] * 48,
"Control_to_RECV_mapping":
# [1, 0, 1, 1, 1, 2, 1, x ... 1, 95]
numpy.array([[1, x] for x in range(0, 96)]).flatten()
}
antennafield_proxy = self.proxy
antennafield_proxy.off()
antennafield_proxy.put_property(mapping_properties)
antennafield_proxy.boot()
self.recv_proxy.write_attribute("RCU_band_select_RW", [False] * 96)
try:
antennafield_proxy.write_attribute(
"RCU_band_select_RW", [True] * 96
)
numpy.testing.assert_equal(
numpy.array([True] * 96),
self.recv_proxy.read_attribute("RCU_band_select_RW").value
)
finally:
# Always restore recv again
self.recv_proxy.write_attribute(
"RCU_band_select_RW", [False] * 96
)
# Verify device did not enter FAULT state
self.assertEqual(DevState.ON, antennafield_proxy.state())
......@@ -63,9 +63,11 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase):
NR_TILES = 48
antennafield_proxy = TestDeviceProxy(self.antennafield_iden)
control_mapping = [[1,i] for i in range(NR_TILES)]
antennafield_proxy.put_property({"RECV_devices": [self.recv_iden],
"HBAT_Control_to_RECV_mapping": numpy.array(control_mapping).flatten(),
'Antenna_Quality': antenna_qualities, 'Antenna_Use': antenna_use})
antennafield_proxy.put_property({
"RECV_devices": [self.recv_iden],
"Control_to_RECV_mapping": numpy.array(control_mapping).flatten(),
'Antenna_Quality': antenna_qualities, 'Antenna_Use': antenna_use}
)
antennafield_proxy.off()
antennafield_proxy.boot()
return antennafield_proxy
......
......@@ -22,9 +22,9 @@ logger = logging.getLogger()
class TestRecvCluster(base.IntegrationTestCase):
# The AntennaField is setup with self.NR_TILES tiles in the test configuration
NR_TILES = 48
POINTING_DIRECTION = numpy.array([["J2000","0deg","0deg"]] * NR_TILES).flatten()
POINTING_DIRECTION = numpy.array([["J2000", "0deg", "0deg"]] * NR_TILES).flatten()
def setUp(self):
......
......@@ -7,6 +7,13 @@
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
import time
import statistics
import logging
import unittest
from unittest import mock
import numpy
from tango.test_context import DeviceTestContext
......@@ -16,6 +23,8 @@ from tangostationcontrol.devices.antennafield import AntennaToRecvMapper, Antenn
from tangostationcontrol.test import base
from tangostationcontrol.test.devices import device_base
logger = logging.getLogger()
class TestAntennaToRecvMapper(base.TestCase):
......@@ -190,50 +199,54 @@ class TestAntennaToRecvMapper(base.TestCase):
# Rename to write
def test_map_write_ant_mask_rw_no_mapping_and_one_receiver(self):
"""Verify results None without control and array sizes"""
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1)
set_values = [False] * 48
expected = [[False] * 96]
set_values = [None] * 48
expected = [[None] * 96]
actual = mapper.map_write("ANT_mask_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_ant_mask_rw_no_mapping_and_two_receivers(self):
"""Verify results None without control and array sizes"""
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2)
set_values = [False] * 48
expected = [[False] * 96] * 2
set_values = [None] * 48
expected = [[None] * 96] * 2
actual = mapper.map_write("ANT_mask_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_ant_mask_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1)
set_values = [True, False] + [False] * 46
expected = [[False, True, False] + [False] * 93]
set_values = [True, False] + [None] * 46
expected = [[False, True] + [None] * 94]
actual = mapper.map_write("ANT_mask_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_rcu_band_select_no_mapping_and_one_receiver(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1)
set_values = [0] * 48
expected = [[0] * 96]
set_values = [None] * 48
expected = [[None] * 96]
actual = mapper.map_write("RCU_band_select_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_rcu_band_select_no_mapping_and_two_receivers(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2)
set_values = [0] * 48
expected = [[0] * 96] * 2
set_values = [None] * 48
expected = [[None] * 96] * 2
actual = mapper.map_write("RCU_band_select_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_rcu_band_select_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1)
set_values = [1, 0] + [0] * 46
expected = [[0, 1, 0] + [0] * 93]
set_values = [1, 0] + [None] * 46
expected = [[0, 1] + [None] * 94]
actual = mapper.map_write("RCU_band_select_RW", set_values)
numpy.testing.assert_equal(expected, actual)
......@@ -241,7 +254,7 @@ class TestAntennaToRecvMapper(base.TestCase):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1)
set_values = [[1] * 32] * 48
expected = [[[0] * 32] * 96]
expected = [[[None] * 32] * 96]
actual = mapper.map_write("HBAT_BF_delay_steps_RW", set_values)
numpy.testing.assert_equal(expected, actual)
......@@ -249,104 +262,138 @@ class TestAntennaToRecvMapper(base.TestCase):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2)
set_values = [[1] * 32] * 48
expected = [[[0] * 32] * 96, [[0] * 32] * 96]
expected = [[[None] * 32] * 96, [[None] * 32] * 96]
actual = mapper.map_write("HBAT_BF_delay_steps_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_bf_delay_steps_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1)
set_values = [[1] * 32, [2] * 32] + [[0] * 32] * 46
expected = [[[2] * 32, [1] * 32] + [[0] * 32] * 94]
set_values = [[1] * 32, [2] * 32] + [[None] * 32] * 46
expected = [[[2] * 32, [1] * 32] + [[None] * 32] * 94]
actual = mapper.map_write("HBAT_BF_delay_steps_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_led_on_rw_no_mapping_and_one_receiver(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1)
set_values = [[False] * 32] * 48
expected = [[[False] * 32] * 96]
set_values = [[None] * 32] * 48
expected = [[[None] * 32] * 96]
actual = mapper.map_write("HBAT_LED_on_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_led_on_rw_no_mapping_and_two_receivers(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2)
set_values = [[False] * 32] * 48
expected = [[[False] * 32] * 96, [[False] * 32] * 96]
set_values = [[None] * 32] * 48
expected = [[[None] * 32] * 96, [[None] * 32] * 96]
actual = mapper.map_write("HBAT_LED_on_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_led_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1)
set_values = [[False, True] * 16, [True, False] * 16] + [[False] * 32] * 46
expected = [[[True, False] * 16, [False, True] * 16] + [[False] * 32] * 94]
set_values = [[False, True] * 16, [True, False] * 16] + [[None] * 32] * 46
expected = [[[True, False] * 16, [False, True] * 16] + [[None] * 32] * 94]
actual = mapper.map_write("HBAT_LED_on_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_pwr_lna_on_rw_no_mapping_and_one_receiver(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1)
set_values = [[False] * 32] * 48
expected = [[[False] * 32] * 96]
set_values = [[None] * 32] * 48
expected = [[[None] * 32] * 96]
actual = mapper.map_write("HBAT_PWR_LNA_on_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_pwr_lna_on_rw_no_mapping_and_two_receivers(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2)
set_values = [[False] * 32] * 48
expected = [[[False] * 32] * 96, [[False] * 32] * 96]
set_values = [[None] * 32] * 48
expected = [[[None] * 32] * 96, [[None] * 32] * 96]
actual = mapper.map_write("HBAT_PWR_LNA_on_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_pwr_lna_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1)
set_values = [[False, True] * 16, [True, False] * 16] + [[False] * 32] * 46
expected = [[[True, False] * 16, [False, True] * 16] + [[False] * 32] * 94]
set_values = [[False, True] * 16, [True, False] * 16] + [[None] * 32] * 46
expected = [[[True, False] * 16, [False, True] * 16] + [[None] * 32] * 94]
actual = mapper.map_write("HBAT_PWR_LNA_on_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_pwr_on_rw_no_mapping_and_one_receiver(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1)
set_values = [[False] * 32] * 48
expected = [[[False] * 32] * 96]
set_values = [[None] * 32] * 48
expected = [[[None] * 32] * 96]
actual = mapper.map_write("HBAT_PWR_on_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_lna_on_rw_no_mapping_and_two_receivers(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2)
set_values = [[False] * 32] * 48
expected = [[[False] * 32] * 96, [[False] * 32] * 96]
set_values = [[None] * 32] * 48
expected = [[[None] * 32] * 96, [[None] * 32] * 96]
actual = mapper.map_write("HBAT_PWR_on_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_pwr_on_rw_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1)
set_values = [[False, True] * 16, [True, False] * 16] + [[False] * 32] * 46
expected = [[[True, False] * 16, [False, True] * 16] + [[False] * 32] * 94]
set_values = [[False, True] * 16, [True, False] * 16] + [[None] * 32] * 46
expected = [[[True, False] * 16, [False, True] * 16] + [[None] * 32] * 94]
actual = mapper.map_write("HBAT_PWR_on_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_merge_write(self):
"""Verify all None fields are replaced by merge_write if no control"""
mapper = AntennaToRecvMapper(
self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1
)
merge_values = [[None] * 32] * 96
current_values = [[False] * 32] * 96
mapper.merge_write(merge_values, current_values)
numpy.testing.assert_equal(merge_values, current_values)
results = []
for _i in range(25):
start_time = time.monotonic_ns()
mapper.merge_write(merge_values, current_values)
stop_time = time.monotonic_ns()
results.append(stop_time - start_time)
logging.error(
f"Merge write performance: Median {statistics.median(results) / 1.e9} "
f"Stdev {statistics.stdev(results) / 1.e9}"
)
def test_merge_write_values(self):
"""Verify all fields with values are retained by merge_write"""
mapper = AntennaToRecvMapper(
self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1
)
merge_values = [[True] * 32] * 2 + [[None] * 32] * 94
current_values = [[True] * 32] * 2 + [[False] * 32] * 94
mapper.merge_write(merge_values, current_values)
numpy.testing.assert_equal(merge_values, current_values)
class TestAntennafieldDevice(device_base.DeviceTestCase):
# some dummy values for mandatory properties
AT_PROPERTIES = {'OPC_Server_Name': 'example.com', 'OPC_Server_Port': 4840, 'OPC_Time_Out': 5.0,
'Antenna_Field_Reference_ITRF' : [3.0, 3.0, 3.0], 'Antenna_Field_Reference_ETRS' : [7.0, 7.0, 7.0]}
# A mapping where Antennas are all not mapped to power RCUs
POWER_NOT_CONNECTED = [[-1, -1]] * 48
# A mapping where Antennas are all not mapped to control RCUs
CONTROL_NOT_CONNECTED = [[-1, -1]] * 48
# A mapping where first two Antennas are mapped on the first Receiver.
# The first Antenna control line on RCU 1 and the second Antenna control line on RCU 0.
CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1 = [[1, 1], [1, 0]] + [[-1, -1]] * 46
AT_PROPERTIES = {
'OPC_Server_Name': 'example.com',
'OPC_Server_Port': 4840,
'OPC_Time_Out': 5.0,
'Antenna_Field_Reference_ITRF': [3.0, 3.0, 3.0],
'Antenna_Field_Reference_ETRS': [7.0, 7.0, 7.0],
}
def setUp(self):
# DeviceTestCase setUp patches lofar_device DeviceProxy
......@@ -397,3 +444,33 @@ class TestAntennafieldDevice(device_base.DeviceTestCase):
with DeviceTestContext(antennafield.AntennaField, properties={**self.AT_PROPERTIES, **antenna_properties}, process=True) as proxy:
for i in range(len(antenna_names)):
self.assertTrue(proxy.Antenna_Names_R[i]==f"C{i}")
@unittest.skip("Test for manual use, enable at most one (process=false)")
@mock.patch.object(antennafield, "DeviceProxy")
def test_set_mapped_attribute(self, m_proxy):
"""Verify set_mapped_attribute only modifies controlled inputs"""
antenna_properties = {
'RECV_devices': ['stat/RECV/1'],
}
data = numpy.array([[False] * 32] * 96)
m_proxy.return_value = mock.Mock(
read_attribute=mock.Mock(
return_value=mock.Mock(value=data)
)
)
with DeviceTestContext(
antennafield.AntennaField, process=False,
properties={**self.AT_PROPERTIES, **antenna_properties}
) as proxy:
proxy.boot()
proxy.write_attribute("HBAT_PWR_on_RW", numpy.array([[False] * 32] * 48))
numpy.testing.assert_equal(
m_proxy.return_value.write_attribute.call_args[0][1],
data
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment