Skip to content
Snippets Groups Projects
Commit bbc59d98 authored by Stefano Di Frischia's avatar Stefano Di Frischia
Browse files

Merge branch 'master' into L2SS-869-improve-querying-in-stats-writer

parents 50cd3883 d6eac54d
No related branches found
No related tags found
1 merge request!426Resolve L2SS-869 "Improve querying in stats writer"
......@@ -553,7 +553,6 @@ unit_test:
- tangostationcontrol/.coverage
integration_test_docker:
stage: integration-tests
allow_failure: true
image: docker:latest
tags:
- privileged
......
......@@ -11,14 +11,18 @@
"""
# PyTango imports
from tango.server import attribute, command, Device, DeviceMeta
from tango import AttrWriteType, DevState, DebugIt, Attribute, DeviceProxy, AttrDataFormat, DevSource, DevDouble
from collections.abc import Sequence
import time
import math
from typing import List
import numpy
import textwrap
# PyTango imports
from tango.server import attribute, command, Device, DeviceMeta
from tango import AttrWriteType, DevState, DebugIt, Attribute, DeviceProxy, AttrDataFormat, DevSource, DevDouble
# Additional import
from tangostationcontrol import __version__ as version
from tangostationcontrol.clients.attribute_wrapper import attribute_wrapper
......@@ -34,6 +38,14 @@ import logging
logger = logging.getLogger()
# TODO(Corne): Remove this in L2SS-940
def sequence_not_str(obj):
"""Separate sequences / collections from str, byte or bytearray"""
return (isinstance(obj, Sequence) or isinstance(obj, numpy.ndarray)) and not \
isinstance(obj, (str, bytes, bytearray))
class lofar_device(Device, metaclass=DeviceMeta):
"""
......@@ -92,6 +104,59 @@ class lofar_device(Device, metaclass=DeviceMeta):
return self.get_state() in INITIALISED_STATES
# TODO(Corne): Actually implement locking in L2SS-940
def atomic_read_modify_write_attribute(
self, values: numpy.ndarray, proxy: DeviceProxy, attribute: str, sparse=None
):
"""Atomatically read-modify-write the attribute on the given proxy"""
current_values = proxy.read_attribute(attribute).value
self.merge_write(values, current_values, sparse)
proxy.write_attribute(attribute, values)
# TODO(Corne): Update docstring in L2SS-940
def merge_write(
self, merge_values: List[any], current_values: List[any], mask_or_sparse=None
):
"""Merge values as retrieved from :py:func:`~map_write` with current_values
This method will modify the contents of merge_values.
To be used by the :py:class:`~AntennaField` device to remove sparse fields
from mapped_values with recently retrieved current_values from RECV device.
:param merge_values: values as retrieved from :py:func:`~map_write`
:param current_values: values retrieved from RECV device on specific attribute
:param sparse: The value to identify sparse entries
"""
if mask_or_sparse is not None and sequence_not_str(mask_or_sparse):
self._merge_write_mask(
merge_values, current_values, mask_or_sparse
)
else:
self._merge_write_delimiter(
merge_values, current_values, mask_or_sparse
)
def _merge_write_delimiter(
self, merge_values: List[any], current_values: List[any], sparse=None
):
for idx, value in enumerate(merge_values):
if sequence_not_str(value):
self._merge_write_delimiter(merge_values[idx], current_values[idx], sparse)
elif value == sparse:
merge_values[idx] = current_values[idx]
def _merge_write_mask(
self, merge_values: List[any], current_values: List[any], mask: List[any]
):
for idx, value in enumerate(merge_values):
if sequence_not_str(value):
self._merge_write_mask(merge_values[idx], current_values[idx], mask[idx])
elif not mask[idx]:
merge_values[idx] = current_values[idx]
@log_exceptions()
def init_device(self):
""" Instantiates the device in the OFF state. """
......
......@@ -224,9 +224,6 @@ class DigitalBeam(beam_device):
beam_weights = self.beamlet_proxy.calculate_bf_weights(fpga_delays.flatten())
beam_weights = beam_weights.reshape((Beamlet.N_PN, Beamlet.A_PN * Beamlet.N_POL * Beamlet.N_BEAMLETS_CTRL))
# Filter out unwanted antennas (they get a weight of 0)
beam_weights *= self._map_inputs_on_polarised_inputs(self._input_select)
return beam_weights
@TimeIt()
......@@ -234,8 +231,13 @@ class DigitalBeam(beam_device):
"""
Uploads beam weights based on a given pointing direction 2D array (96 tiles x 3 parameters)
"""
# Write weights to SDP
self.beamlet_proxy.FPGA_bf_weights_xx_yy_RW = beam_weights
self.atomic_read_modify_write_attribute(
beam_weights,
self.beamlet_proxy,
"FPGA_bf_weights_xx_yy_RW",
self._map_inputs_on_polarised_inputs(self._input_select)
)
# Record where we now point to, now that we've updated the weights.
# Only record pointings per beamlet, not which antennas took part
......
......@@ -21,22 +21,30 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase):
antenna_qualities_only_second = numpy.array([AntennaQuality.BROKEN] + [AntennaQuality.OK] + [AntennaQuality.BROKEN] * 94)
antenna_use_ok = numpy.array([AntennaUse.AUTO] * 96)
antennafield_iden = "STAT/AntennaField/1"
beamlet_iden = "STAT/Beamlet/1"
recv_iden = "STAT/RECV/1"
sdp_iden = "STAT/SDP/1"
def setUp(self):
"""Intentionally recreate the device object in each test"""
super().setUp("STAT/DigitalBeam/1")
self.addCleanup(TestDeviceProxy.test_device_turn_off, self.beamlet_iden)
self.addCleanup(TestDeviceProxy.test_device_turn_off, self.recv_iden)
self.recv_proxy = self.setup_recv_proxy()
self.beamlet_proxy = self.setup_beamlet_proxy()
def setup_recv_proxy(self):
recv_proxy = TestDeviceProxy("STAT/RECV/1")
recv_proxy = TestDeviceProxy(self.recv_iden)
recv_proxy.off()
recv_proxy.warm_boot()
recv_proxy.set_defaults()
return recv_proxy
def setup_beamlet_proxy(self):
beamlet_proxy = TestDeviceProxy("STAT/Beamlet/1")
beamlet_proxy = TestDeviceProxy(self.beamlet_iden)
beamlet_proxy.off()
beamlet_proxy.warm_boot()
beamlet_proxy.set_defaults()
......@@ -44,7 +52,7 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase):
def setup_sdp_proxy(self):
# setup SDP, on which this device depends
sdp_proxy = TestDeviceProxy("STAT/SDP/1")
sdp_proxy = TestDeviceProxy(self.sdp_iden)
sdp_proxy.off()
sdp_proxy.warm_boot()
sdp_proxy.set_defaults()
......@@ -53,19 +61,24 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase):
def setup_antennafield_proxy(self, antenna_qualities, antenna_use):
# setup AntennaField
NR_TILES = 48
antennafield_proxy = TestDeviceProxy("STAT/AntennaField/1")
antennafield_proxy = TestDeviceProxy(self.antennafield_iden)
control_mapping = [[1,i] for i in range(NR_TILES)]
antennafield_proxy.put_property({"RECV_devices": ["STAT/RECV/1"],
"Control_to_RECV_mapping": numpy.array(control_mapping).flatten(),
antennafield_proxy.put_property({"RECV_devices": [self.recv_iden],
"HBAT_Control_to_RECV_mapping": numpy.array(control_mapping).flatten(),
'Antenna_Quality': antenna_qualities, 'Antenna_Use': antenna_use})
antennafield_proxy.off()
antennafield_proxy.boot()
return antennafield_proxy
def test_pointing_to_zenith(self):
self.addCleanup(TestDeviceProxy.test_device_turn_off, self.sdp_iden)
self.addCleanup(
TestDeviceProxy.test_device_turn_off, self.antennafield_iden
)
self.setup_antennafield_proxy(self.antenna_qualities_ok, self.antenna_use_ok)
self.setup_sdp_proxy()
self.setup_recv_proxy()
# Setup beamlet configuration
self.beamlet_proxy.clock_RW = 200 * 1000000
self.beamlet_proxy.subband_select = list(range(488))
......@@ -80,8 +93,81 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase):
# beam weights should now be non-zero, we don't actually check their values for correctness
self.assertNotEqual(0, sum(self.beamlet_proxy.FPGA_bf_weights_xx_yy_RW.flatten()))
def test_set_pointing_masked_enable(self):
"""Verify that only selected inputs are written"""
self.addCleanup(TestDeviceProxy.test_device_turn_off, self.sdp_iden)
self.addCleanup(
TestDeviceProxy.test_device_turn_off, self.antennafield_iden
)
self.setup_antennafield_proxy(self.antenna_qualities_ok, self.antenna_use_ok)
self.setup_sdp_proxy()
# Setup beamlet configuration
self.beamlet_proxy.clock_RW = 200 * 1000000
self.beamlet_proxy.subband_select = list(range(488))
self.proxy.initialise()
self.proxy.Tracking_enabled_RW = False
self.proxy.on()
all_zeros = numpy.array([[0] * 5856] * 16)
self.beamlet_proxy.FPGA_bf_weights_xx_yy_RW = all_zeros
# Enable all inputs
self.proxy.input_select_RW = numpy.array([[True] * 488] * 96)
self.proxy.set_pointing(
numpy.array([["AZELGEO", "0deg", "90deg"]] * 488).flatten()
)
# Verify all zeros are replaced with other values for all inputs
self.assertTrue(numpy.any(numpy.not_equal(
all_zeros, self.beamlet_proxy.FPGA_bf_weights_xx_yy_RW
)))
def test_set_pointing_masked_disable(self):
"""Verify that only diabled inputs are unchanged"""
self.addCleanup(TestDeviceProxy.test_device_turn_off, self.sdp_iden)
self.addCleanup(
TestDeviceProxy.test_device_turn_off, self.antennafield_iden
)
self.setup_antennafield_proxy(self.antenna_qualities_ok, self.antenna_use_ok)
self.setup_sdp_proxy()
# Setup beamlet configuration
self.beamlet_proxy.clock_RW = 200 * 1000000
self.beamlet_proxy.subband_select = list(range(488))
self.proxy.initialise()
self.proxy.Tracking_enabled_RW = False
self.proxy.on()
non_zeros = numpy.array([[16] * 5856] * 16)
self.beamlet_proxy.FPGA_bf_weights_xx_yy_RW = non_zeros
# Disable all inputs
self.proxy.input_select_RW = numpy.array([[False] * 488] * 96)
self.proxy.set_pointing(
numpy.array([["AZELGEO", "0deg", "90deg"]] * 488).flatten()
)
# Verify all zeros are replaced with other values for all inputs
numpy.testing.assert_equal(
non_zeros, self.beamlet_proxy.FPGA_bf_weights_xx_yy_RW
)
def test_input_select_with_all_antennas_ok(self):
"""Verify if input and antenna select are correctly calculated following Antennafield.Antenna_Usage_Mask"""
self.addCleanup(
TestDeviceProxy.test_device_turn_off, self.antennafield_iden
)
antennafield_proxy = self.setup_antennafield_proxy(self.antenna_qualities_ok, self.antenna_use_ok)
numpy.testing.assert_equal(numpy.array([True] * 96), antennafield_proxy.Antenna_Usage_Mask_R)
self.setUp()
......@@ -93,6 +179,11 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase):
def test_input_select_with_only_second_antenna_ok(self):
"""Verify if input and antenna select are correctly calculated following Antennafield.Antenna_Usage_Mask"""
self.addCleanup(
TestDeviceProxy.test_device_turn_off, self.antennafield_iden
)
antennafield_proxy = self.setup_antennafield_proxy(self.antenna_qualities_only_second, self.antenna_use_ok)
numpy.testing.assert_equal(numpy.array([False] + [True] + [False] * 94), antennafield_proxy.Antenna_Usage_Mask_R)
self.setUp()
......
......@@ -22,6 +22,7 @@ class TestDeviceProxy(DeviceProxy):
# See also https://www.tango-controls.org/community/forum/c/development/python/attribute-direct-reading-from-device-when-polling-is-turned-on/
self.set_source(DevSource.DEV)
@staticmethod
def test_device_turn_off(endpoint):
d = TestDeviceProxy(endpoint)
......
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR 2.0 Station Software
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
# Builtin regular libraries
import copy
# External regular libraries
import numpy
# Internal regular imports
from tangostationcontrol.devices.sdp import digitalbeam
# Builtin test libraries
from unittest import mock
import unittest
# External test libraries
from tango.test_context import DeviceTestContext
# Internal test imports
from tangostationcontrol.test.devices import device_base
class TestDigitalBeamDevice(device_base.DeviceTestCase):
def setUp(self):
# DeviceTestCase setUp patches lofar_device DeviceProxy
super(TestDigitalBeamDevice, self).setUp()
@unittest.skip("Test for manual use, enable at most one (process=false)")
@mock.patch.object(digitalbeam.DigitalBeam, "_wait_to_apply_weights")
@mock.patch.object(digitalbeam.DigitalBeam, "_compute_weights")
@mock.patch.object(digitalbeam, "DeviceProxy")
def test_apply_weights_disabled(self, m_proxy, m_compute, m_wait):
"""Verify won't overwrite digitalbeam data if no input_selected"""
input_data = numpy.array([["AZELGEO", "0deg", "90deg"]] * 488).flatten()
current_data = numpy.array([[16384] * 5856] * 16)
m_proxy.return_value = mock.Mock(
read_attribute=mock.Mock(
return_value=mock.Mock(value=copy.copy(current_data))
),
Antenna_Usage_Mask_R=numpy.array([0] * 96),
Antenna_Field_Reference_ITRF_R=mock.MagicMock(),
HBAT_reference_ITRF_R=numpy.array([[0] * 3] * 96)
)
new_data = numpy.array(
[[16384] * 2928 + [0] * 2928] * 16
)
m_compute.return_value = copy.copy(new_data)
with DeviceTestContext(
digitalbeam.DigitalBeam, process=False,
) as proxy:
proxy.initialise()
proxy.Tracking_enabled_RW = False
proxy.input_select_RW = numpy.array([[False] * 488] * 96)
proxy.set_pointing(input_data)
numpy.testing.assert_equal(
m_proxy.return_value.write_attribute.call_args[0][0],
current_data
)
@unittest.skip("Test for manual use, enable at most one (process=false)")
@mock.patch.object(digitalbeam.DigitalBeam, "_wait_to_apply_weights")
@mock.patch.object(digitalbeam.DigitalBeam, "_compute_weights")
@mock.patch.object(digitalbeam, "DeviceProxy")
def test_apply_weights_enabled(self, m_proxy, m_compute, m_wait):
"""Verify can overwrite digitalbeam data if input_selected"""
input_data = numpy.array([["AZELGEO", "0deg", "90deg"]] * 488).flatten()
current_data = numpy.array([[16384] * 5856] * 16)
m_proxy.return_value = mock.Mock(
read_attribute=mock.Mock(
return_value=mock.Mock(value=current_data)
),
Antenna_Usage_Mask_R=numpy.array([0] * 96),
Antenna_Field_Reference_ITRF_R=mock.MagicMock(),
HBAT_reference_ITRF_R=numpy.array([[0] * 3] * 96)
)
new_data = numpy.array(
[[16384] * 2928 + [0] * 2928] * 16
)
m_compute.return_value = copy.copy(new_data)
with DeviceTestContext(
digitalbeam.DigitalBeam, process=False,
) as proxy:
proxy.initialise()
proxy.Tracking_enabled_RW = False
proxy.input_select_RW = numpy.array([[True] * 488] * 96)
proxy.set_pointing(input_data)
numpy.testing.assert_equal(
m_proxy.return_value.write_attribute.call_args[0][0],
new_data
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment