Skip to content
Snippets Groups Projects
Commit fc83551f authored by Corné Lukken's avatar Corné Lukken
Browse files

L2SS-876: Add tests for retaining values on recv from antennafield

Antennafield configures values for recv devices. This patch
ensures that the merging of values is correct as only controlled
inputs should be modified.
parent 07362df4
Branches
Tags
1 merge request!408L2SS-876: Get current RECV values before updating through antennafield
# -*- coding: utf-8 -*-
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
from collections.abc import Sequence
import numpy
def sequence_not_str(obj):
"""Separate sequences / collections from str, byte or bytearray"""
return (isinstance(obj, Sequence) or isinstance(obj, numpy.ndarray)) and not \
isinstance(obj, (str, bytes, bytearray))
......@@ -16,6 +16,7 @@ from tango import DeviceProxy, DevSource, AttrWriteType, DevVarFloatArray, DevVa
from tango.server import device_property, attribute, command
# Additional import
from tangostationcontrol.common.type_checking import sequence_not_str
from tangostationcontrol.common.entrypoint import entry
from tangostationcontrol.devices.lofar_device import lofar_device
from tangostationcontrol.common.lofar_logging import device_logging_to_python, log_exceptions
......@@ -404,13 +405,14 @@ class AntennaField(lofar_device):
and `write_attribute`!
"""
mapped_value = self.__mapper.map_write(mapped_point, value)
for idx, recv_proxy in enumerate(self.recv_proxies):
new_values = mapped_value[idx]
# TODO(Corne): Resolve potential lost update race condition
current_values = recv_proxy.read_attribute(mapped_point)
current_values = recv_proxy.read_attribute(mapped_point).value
self.__mapper.merge_write(new_values, current_values)
recv_proxy.write_attribute(mapped_point, new_values)
......@@ -550,8 +552,7 @@ class AntennaToRecvMapper(object):
return mapped_values
@staticmethod
def merge_write(merge_values: List[any], current_values: List[any]):
def merge_write(self, merge_values: List[any], current_values: List[any]):
"""Merge values as retrieved from :py:func:`~map_write` with current_values
This method will modify the contents of merge_values.
......@@ -564,7 +565,9 @@ class AntennaToRecvMapper(object):
"""
for idx, value in enumerate(merge_values):
if value is None:
if sequence_not_str(value):
self.merge_write(merge_values[idx], current_values[idx])
elif value is None:
merge_values[idx] = current_values[idx]
def _mapped_r_values(self, recv_results: List[any], default_values: List[any]):
......
......@@ -23,7 +23,7 @@ These arguments and modules can also be passed at the level of the Makefile
instead of through tox directly:
```shell
make integration default import.path.class.functionname`
make integration import.path.class.functionname`
```
## Breakpoints & Debuggers with Integration Tests
......
......@@ -7,12 +7,14 @@
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
from tango._tango import DevState
import numpy
from tangostationcontrol.integration_test.device_proxy import TestDeviceProxy
from tangostationcontrol.devices.antennafield import AntennaQuality, AntennaUse
from .base import AbstractTestBases
class TestAntennaFieldDevice(AbstractTestBases.TestDeviceBase):
def setUp(self):
......@@ -21,6 +23,21 @@ class TestAntennaFieldDevice(AbstractTestBases.TestDeviceBase):
"Power_to_RECV_mapping": [1, 1, 1, 0] + [-1] * 92})
self.recv_proxy = self.setup_recv_proxy()
self.addCleanup(self.restore_antennafield)
self.addCleanup(self.shutdown_recv, self.recv_proxy)
def restore_antennafield(self):
self.proxy.put_property({
"RECV_devices": ["STAT/RECV/1"],
"HBAT_Power_to_RECV_mapping": [-1] * 96,
"HBAT_Control_to_RECV_mapping": [-1] * 96
})
@staticmethod
def shutdown_recv(recv_proxy):
recv_proxy = TestDeviceProxy("STAT/RECV/1")
recv_proxy.off()
def setup_recv_proxy(self):
# setup RECV
recv_proxy = TestDeviceProxy("STAT/RECV/1")
......@@ -35,9 +52,9 @@ class TestAntennaFieldDevice(AbstractTestBases.TestDeviceBase):
def test_ANT_mask_RW_configured_after_Antenna_Usage_Mask(self):
""" Verify if ANT_mask_RW values are correctly configured from Antenna_Usage_Mask values"""
recv_proxy = self.setup_recv_proxy()
antennafield_proxy = self.proxy
numpy.testing.assert_equal(numpy.array([True] * 96), recv_proxy.ANT_mask_RW)
numpy.testing.assert_equal(numpy.array([True] * 96), self.recv_proxy.ANT_mask_RW)
antenna_qualities = numpy.array([AntennaQuality.OK] * 96)
antenna_use = numpy.array([AntennaUse.ON] + [AntennaUse.AUTO] * 95)
......@@ -51,11 +68,11 @@ class TestAntennaFieldDevice(AbstractTestBases.TestDeviceBase):
antennafield_proxy.boot() # initialise hardware values as well
numpy.testing.assert_equal(numpy.array([True] * 96), antennafield_proxy.Antenna_Usage_Mask_R)
numpy.testing.assert_equal(numpy.array([True] * 2 + [False] * 46), antennafield_proxy.ANT_mask_RW)
numpy.testing.assert_equal(numpy.array([True] * 2 + [False] * 46 + [False] * 48), recv_proxy.ANT_mask_RW)
numpy.testing.assert_equal(numpy.array([True] * 2 + [False] * 46 + [False] * 48), self.recv_proxy.ANT_mask_RW)
def test_ANT_mask_RW_configured_after_Antenna_Usage_Mask_only_one_functioning_antenna(self):
"""Verify if ANT_mask_RW values are correctly configured from Antenna_Usage_Mask values (only second antenna is OK)"""
recv_proxy = self.setup_recv_proxy()
antennafield_proxy = self.proxy
antenna_qualities = numpy.array([AntennaQuality.BROKEN] + [AntennaQuality.OK] + [AntennaQuality.BROKEN] * 94)
antenna_use = numpy.array([AntennaUse.AUTO] * 96)
......@@ -69,4 +86,134 @@ class TestAntennaFieldDevice(AbstractTestBases.TestDeviceBase):
antennafield_proxy.boot() # initialise hardware values as well
numpy.testing.assert_equal(numpy.array([False] + [True] + [False] * 94), antennafield_proxy.Antenna_Usage_Mask_R)
numpy.testing.assert_equal(numpy.array([False] + [True] + [False] * 46), antennafield_proxy.ANT_mask_RW)
numpy.testing.assert_equal(numpy.array([False] + [True] + [False] * 46 + [False] * 48), recv_proxy.ANT_mask_RW)
numpy.testing.assert_equal(numpy.array([False] + [True] + [False] * 46 + [False] * 48), self.recv_proxy.ANT_mask_RW)
def test_antennafield_set_mapped_attribute_ignore_all(self):
"""Verify RECV device attribute unaffected by antennafield if not mapped"""
mapping_properties = {
"RECV_devices": ["STAT/RECV/1"],
"HBAT_Power_to_RECV_mapping": [-1, -1] * 48,
"HBAT_Control_to_RECV_mapping": [-1, -1] * 48
}
antennafield_proxy = self.proxy
antennafield_proxy.off()
antennafield_proxy.put_property(mapping_properties)
antennafield_proxy.boot()
self.recv_proxy.write_attribute("HBAT_PWR_on_RW", [[False] * 32] * 96)
current_values = self.recv_proxy.read_attribute("HBAT_PWR_on_RW").value
antennafield_proxy.write_attribute("HBAT_PWR_on_RW", [[True] * 32] * 48)
numpy.testing.assert_equal(
current_values,
self.recv_proxy.read_attribute("HBAT_PWR_on_RW").value
)
# Verify device did not enter FAULT state
self.assertEqual(DevState.ON, antennafield_proxy.state())
def test_antennafield_set_mapped_attribute(self):
"""Verify RECV device attribute changed by antennafield if mapped inputs"""
mapping_properties = {
"RECV_devices": ["STAT/RECV/1"],
"HBAT_Power_to_RECV_mapping": [-1, -1] * 48,
# Each pair is one mapping so 2 inputs are connected
"HBAT_Control_to_RECV_mapping": [1, 0, 1, 1] + [-1, -1] * 46
}
antennafield_proxy = self.proxy
antennafield_proxy.off()
antennafield_proxy.put_property(mapping_properties)
antennafield_proxy.boot()
self.recv_proxy.write_attribute("HBAT_PWR_on_RW", [[False] * 32] * 96)
try:
antennafield_proxy.write_attribute(
"HBAT_PWR_on_RW", [[True] * 32] * 48
)
numpy.testing.assert_equal(
numpy.array([[True] * 32] * 2 + [[False] * 32] * 94),
self.recv_proxy.read_attribute("HBAT_PWR_on_RW").value
)
finally:
# Always disable recv again
self.recv_proxy.write_attribute(
"HBAT_PWR_on_RW", [[False] * 32] * 96
)
# Verify device did not enter FAULT state
self.assertEqual(DevState.ON, antennafield_proxy.state())
def test_antennafield_set_mapped_attribute_all(self):
"""Verify RECV device attribute changed by antennafield all inputs mapped"""
mapping_properties = {
"RECV_devices": ["STAT/RECV/1"],
"HBAT_Power_to_RECV_mapping": [-1, -1] * 48,
"HBAT_Control_to_RECV_mapping":
# [1, 0, 1, 1, 1, 2, 1, x ... 1, 95]
numpy.array([[1, x] for x in range(0, 96)]).flatten()
}
antennafield_proxy = self.proxy
antennafield_proxy.off()
antennafield_proxy.put_property(mapping_properties)
antennafield_proxy.boot()
self.recv_proxy.write_attribute("HBAT_PWR_on_RW", [[False] * 32] * 96)
try:
antennafield_proxy.write_attribute(
"HBAT_PWR_on_RW", [[True] * 32] * 96
)
numpy.testing.assert_equal(
numpy.array([[True] * 32] * 96),
self.recv_proxy.read_attribute("HBAT_PWR_on_RW").value
)
finally:
# Always disable recv again
self.recv_proxy.write_attribute(
"HBAT_PWR_on_RW", [[False] * 32] * 96
)
# Verify device did not enter FAULT state
self.assertEqual(DevState.ON, antennafield_proxy.state())
def test_antennafield_set_mapped_attribute_small(self):
"""Verify small RECV device attribute changed all inputs mapped"""
mapping_properties = {
"RECV_devices": ["STAT/RECV/1"],
"HBAT_Power_to_RECV_mapping": [-1, -1] * 48,
"HBAT_Control_to_RECV_mapping":
# [1, 0, 1, 1, 1, 2, 1, x ... 1, 95]
numpy.array([[1, x] for x in range(0, 96)]).flatten()
}
antennafield_proxy = self.proxy
antennafield_proxy.off()
antennafield_proxy.put_property(mapping_properties)
antennafield_proxy.boot()
self.recv_proxy.write_attribute("RCU_band_select_RW", [False] * 96)
try:
antennafield_proxy.write_attribute(
"RCU_band_select_RW", [True] * 96
)
numpy.testing.assert_equal(
numpy.array([True] * 96),
self.recv_proxy.read_attribute("RCU_band_select_RW").value
)
finally:
# Always disable recv again
self.recv_proxy.write_attribute(
"RCU_band_select_RW", [False] * 96
)
# Verify device did not enter FAULT state
self.assertEqual(DevState.ON, antennafield_proxy.state())
......@@ -22,7 +22,7 @@ logger = logging.getLogger()
class TestRecvCluster(base.IntegrationTestCase):
# The AntennaField is setup with self.NR_TILES tiles in the test configuration
NR_TILES = 48
POINTING_DIRECTION = numpy.array([["J2000", "0deg", "0deg"]] * NR_TILES).flatten()
......
......@@ -7,6 +7,10 @@
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
import time
import statistics
import logging
import numpy
from tango.test_context import DeviceTestContext
......@@ -16,6 +20,8 @@ from tangostationcontrol.devices.antennafield import AntennaToRecvMapper, Antenn
from tangostationcontrol.test import base
from tangostationcontrol.test.devices import device_base
logger = logging.getLogger()
class TestAntennaToRecvMapper(base.TestCase):
......@@ -315,7 +321,6 @@ class TestAntennaToRecvMapper(base.TestCase):
def test_map_write_pwr_on_rw_no_mapping_and_one_receiver(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1)
set_values = [[None] * 32] * 48
expected = [[[None] * 32] * 96]
actual = mapper.map_write("HBAT_PWR_on_RW", set_values)
......@@ -337,20 +342,55 @@ class TestAntennaToRecvMapper(base.TestCase):
actual = mapper.map_write("HBAT_PWR_on_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_merge_write(self):
"""Verify all None fields are replaced by merge_write if no control"""
mapper = HBATToRecvMapper(
self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1
)
merge_values = [[None] * 32] * 96
current_values = [[False] * 32] * 96
mapper.merge_write(merge_values, current_values)
numpy.testing.assert_equal(merge_values, current_values)
results = []
for _i in range(25):
start_time = time.monotonic_ns()
mapper.merge_write(merge_values, current_values)
stop_time = time.monotonic_ns()
results.append(stop_time - start_time)
logging.error(
f"Merge write performance: Median {statistics.median(results) / 1.e9} "
f"Stdev {statistics.stdev(results) / 1.e9}"
)
def test_merge_write_values(self):
"""Verify all fields with values are retained by merge_write"""
mapper = HBATToRecvMapper(
self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1
)
merge_values = [[True] * 32] * 2 + [[None] * 32] * 94
current_values = [[True] * 32] * 2 + [[False] * 32] * 94
mapper.merge_write(merge_values, current_values)
numpy.testing.assert_equal(merge_values, current_values)
class TestAntennafieldDevice(device_base.DeviceTestCase):
# some dummy values for mandatory properties
AT_PROPERTIES = {'OPC_Server_Name': 'example.com', 'OPC_Server_Port': 4840, 'OPC_Time_Out': 5.0,
'Antenna_Field_Reference_ITRF' : [3.0, 3.0, 3.0], 'Antenna_Field_Reference_ETRS' : [7.0, 7.0, 7.0]}
# A mapping where Antennas are all not mapped to power RCUs
POWER_NOT_CONNECTED = [[-1, -1]] * 48
# A mapping where Antennas are all not mapped to control RCUs
CONTROL_NOT_CONNECTED = [[-1, -1]] * 48
# A mapping where first two Antennas are mapped on the first Receiver.
# The first Antenna control line on RCU 1 and the second Antenna control line on RCU 0.
CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1 = [[1, 1], [1, 0]] + [[-1, -1]] * 46
AT_PROPERTIES = {
'OPC_Server_Name': 'example.com',
'OPC_Server_Port': 4840,
'OPC_Time_Out': 5.0,
'Antenna_Field_Reference_ITRF' : [3.0, 3.0, 3.0],
'Antenna_Field_Reference_ETRS' : [7.0, 7.0, 7.0],
}
def setUp(self):
# DeviceTestCase setUp patches lofar_device DeviceProxy
......@@ -401,3 +441,31 @@ class TestAntennafieldDevice(device_base.DeviceTestCase):
with DeviceTestContext(antennafield.AntennaField, properties={**self.AT_PROPERTIES, **antenna_properties}, process=True) as proxy:
for i in range(len(antenna_names)):
self.assertTrue(proxy.Antenna_Names_R[i]==f"C{i}")
# @mock.patch.object(antennafield, "DeviceProxy")
# def test_set_mapped_attribute(self, m_proxy):
# """Verify set_mapped_attribute only modifies controlled inputs"""
#
# antenna_properties = {
# 'RECV_devices': ['stat/RECV/1'],
# }
#
# data = numpy.array([[False] * 32] * 96)
#
# m_proxy.return_value = mock.Mock(
# read_attribute=mock.Mock(
# return_value=mock.Mock(value=data)
# )
# )
#
# with DeviceTestContext(
# antennafield.AntennaField, process=False,
# properties={**self.AT_PROPERTIES, **antenna_properties}
# ) as proxy:
# proxy.boot()
# proxy.write_attribute("HBAT_PWR_on_RW", [[False] * 32] * 48)
#
# numpy.testing.assert_equal(
# m_proxy.return_value.write_attribute.call_args.args[1],
# data
# )
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment