Select Git revision
test_device_observation_field.py

L2SS-1509 "Move grafana rpc to opah"
Jan David Mol authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_device_observation_field.py 14.11 KiB
# Copyright (C) 2025 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
import json
from datetime import datetime
from json import loads
import numpy
from integration_tests.device_proxy import TestDeviceProxy
from integration_tests.default.devices.base import TestDeviceBase
from tango import DevState
from tango import DevFailed
from tangostationcontrol.common.constants import (
N_beamlets_ctrl,
N_elements,
N_pn,
N_pol,
MAX_ANTENNA,
MAX_PARALLEL_SUBBANDS,
CS001_TILES,
)
from tangostationcontrol.devices.base_device_classes.antennafield_device import (
AntennaStatus,
AntennaUse,
)
from tangostationcontrol.test.dummy_observation_settings import (
get_observation_settings_hba_core_immediate,
)
from tangostationcontrol.common.env_decorators import (
restore_properties_for_devices,
)
class TestDeviceObservationField(TestDeviceBase):
__test__ = True
ANTENNA_TO_SDP_MAPPING = [
"0",
"0",
"0",
"1",
"0",
"2",
"0",
"3",
"0",
"4",
"0",
"5",
"1",
"0",
"1",
"1",
"1",
"2",
"1",
"3",
"1",
"4",
"1",
"5",
"2",
"0",
"2",
"1",
"2",
"2",
"2",
"3",
"2",
"4",
"2",
"5",
"3",
"0",
"3",
"1",
"3",
"2",
"3",
"3",
"3",
"4",
"3",
"5",
]
antennafield_name = "STAT/AFH/HBA0"
# We override this test to skip it, as Observation does not allow
# writing settings anymore when in ON state.
def test_device_write_all_attributes(self):
pass
@classmethod
def setUpClass(cls):
obs_control = TestDeviceProxy("STAT/ObservationControl/1")
obs_control.create_test_device()
@classmethod
def tearDownClass(cls):
obs_control = TestDeviceProxy("STAT/ObservationControl/1")
obs_control.destroy_test_device()
@restore_properties_for_devices([antennafield_name])
def setUp(self):
super().setUp("STAT/ObservationField/1")
self.VALID_JSON = json.dumps(
json.loads(get_observation_settings_hba_core_immediate().to_json())[
"antenna_fields"
][0]
)
self.recv_proxy = self.setup_proxy("STAT/RECVH/H0", defaults=True)
self.sdpfirmware_proxy = self.setup_proxy("STAT/SDPFirmware/HBA0")
self.sdp_proxy = self.setup_proxy("STAT/SDP/HBA0")
self.sst_proxy = self.setup_proxy("STAT/SST/HBA0")
self.xst_proxy = self.setup_proxy("STAT/XST/HBA0")
self.antennafield_proxy = self.setup_proxy(
self.antennafield_name, cb=self.antennafield_configure
)
self.beamlet_proxy = self.setup_proxy("STAT/Beamlet/HBA0", defaults=True)
self.digitalbeam_proxy = self.setup_proxy(
"STAT/DigitalBeam/HBA0", defaults=True
)
self.tilebeam_proxy = self.setup_proxy("STAT/TileBeam/HBA0", defaults=True)
# load settings so tests (including those in super()) can
# use self.proxy.initialise() and self.proxy.on()
# immediately
self.proxy.observation_field_settings_RW = self.VALID_JSON
@staticmethod
def antennafield_configure(proxy: TestDeviceProxy):
power_mapping = [[1, i * 2 + 0] for i in range(CS001_TILES)]
control_mapping = [[1, i * 2 + 1] for i in range(CS001_TILES)]
antenna_status = numpy.array([AntennaStatus.OK] * MAX_ANTENNA)
antenna_use = numpy.array([AntennaUse.AUTO] * MAX_ANTENNA)
proxy.put_property(
{
"Power_to_RECV_mapping": numpy.array(power_mapping).flatten(),
"Control_to_RECV_mapping": numpy.array(control_mapping).flatten(),
"Antenna_to_SDP_Mapping": TestDeviceObservationField.ANTENNA_TO_SDP_MAPPING,
"Antenna_Status": antenna_status,
"Antenna_Use": antenna_use,
"Antenna_Sets": ["INNER", "OUTER", "SPARSE_EVEN", "SPARSE_ODD", "ALL"],
"Antenna_Set_Masks": [
"111111111111111111111111111111111111111111111111000000000000000000000000000000000000000000000000",
"000000000000000000000000000000000000000000000000111111111111111111111111111111111111111111111111",
"101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010",
"010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101",
"111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111",
],
"HBAT_single_element_selection_GENERIC_201512": [
str(n % 16) for n in range(CS001_TILES)
],
}
)
def test_device_boot(self):
"""Test if we can transition off -> on using a warm boot"""
self.proxy.observation_field_settings_RW = self.VALID_JSON
super().test_device_boot()
def test_device_initialize(self):
"""Test if we can transition to standby"""
self.proxy.observation_field_settings_RW = self.VALID_JSON
super().test_device_initialize()
def test_device_on(self):
"""Test if we can transition off -> standby -> on"""
self.proxy.observation_field_settings_RW = self.VALID_JSON
super().test_device_on()
def test_device_read_all_attributes(self):
"""Test if we can read all of the exposed attributes in the ON state.
This test covers the reading logic of all attributes."""
self.proxy.observation_field_settings_RW = self.VALID_JSON
super().test_device_read_all_attributes()
def test_init_valid(self):
"""Initialize an observation with valid JSON"""
self.proxy.Initialise()
self.assertEqual(DevState.STANDBY, self.proxy.state())
def test_init_no_settings(self):
"""Initialize an observation with _invalid_ JSON"""
# Load invalid settings
with self.assertRaises(DevFailed):
self.proxy.observation_field_settings_RW = "{}"
# Cannot start without valid settings
with self.assertRaises(DevFailed):
self.proxy.Initialise()
# Since initialisation did not succeed, we're still in OFF
self.assertEqual(DevState.FAULT, self.proxy.state())
def test_init_invalid(self):
"""Load an _invalid_ JSON"""
# Cannot write invalid settings
with self.assertRaises(DevFailed):
self.proxy.observation_field_settings_RW = "{}"
self.assertEqual(DevState.OFF, self.proxy.state())
def test_prohibit_rewriting_settings(self):
"""Test that changing observation settings is disallowed once init"""
self.proxy.Initialise()
with self.assertRaises(DevFailed):
self.proxy.write_attribute("observation_field_settings_RW", self.VALID_JSON)
def test_attribute_match(self):
"""Test that JSON data is exposed to attributes"""
# failing
data = loads(self.VALID_JSON)
stop_timestamp = datetime.fromisoformat(data["stop_time"]).timestamp()
observation_id = data["observation_id"]
antenna_set = data["antenna_set"]
filter_ = data["filter"]
saps_subband = data["SAPs"][0]["subbands"]
pointing_direction = data["SAPs"][0]["pointing"]
saps_pointing = [
(
pointing_direction["direction_type"],
f"{pointing_direction['angle1']}rad",
f"{pointing_direction['angle2']}rad",
)
] * len(data["SAPs"][0]["subbands"])
tile_beam = [
str(data["HBA"]["tile_beam"]["direction_type"]),
f"{data['HBA']['tile_beam']['angle1']}rad",
f"{data['HBA']['tile_beam']['angle2']}rad",
]
first_beamlet = data["first_beamlet"]
dithering = data["dithering"]["enabled"]
dithering_power = data["dithering"]["power"]
dithering_frequency = data["dithering"]["frequency"]
dab_filter = data["HBA"]["DAB_filter"]
self.proxy.Initialise()
self.proxy.On()
self.assertEqual(DevState.ON, self.proxy.state())
self.assertEqual(stop_timestamp, self.proxy.stop_time_R)
self.assertEqual(observation_id, self.proxy.observation_id_R)
self.assertEqual(antenna_set, self.proxy.antenna_set_R)
self.assertEqual(filter_, self.proxy.filter_R)
self.assertListEqual(saps_subband, self.proxy.saps_subband_R.tolist())
self.assertListEqual(saps_pointing, list(self.proxy.saps_pointing_R))
self.assertEqual(dab_filter, self.proxy.HBA_DAB_filter_R)
self.assertListEqual(tile_beam, list(self.proxy.HBA_tile_beam_R))
self.assertEqual(first_beamlet, self.proxy.first_beamlet_R)
self.assertEqual(dithering, self.proxy.dithering_enabled_R)
self.assertEqual(dithering_power, self.proxy.dithering_power_R)
self.assertEqual(dithering_frequency, self.proxy.dithering_frequency_R)
def test_apply_antennafield_settings(self):
"""Test that attribute filter is correctly applied"""
self.setup_proxy("STAT/StationManager/1")
antennafield_proxy = self.antennafield_proxy
antennafield_proxy.RCU_band_select_RW = [[0, 0]] * CS001_TILES
self.assertListEqual(
antennafield_proxy.RCU_band_select_RW.tolist(),
[[0, 0]] * CS001_TILES,
)
self.proxy.Initialise()
self.proxy.On()
expected_bands = (
[[2, 2]] * CS001_TILES
) # we request every antenna to be in this band, regardless of mask
self.assertListEqual(
antennafield_proxy.RCU_band_select_RW.tolist(), expected_bands
)
def test_apply_subbands(self):
"""Test that attribute sap subbands is correctly applied"""
beamlet_proxy = self.beamlet_proxy
subband_select = [0] * N_beamlets_ctrl
beamlet_proxy.subband_select_RW = subband_select
self.assertListEqual(beamlet_proxy.subband_select_RW.tolist(), subband_select)
self.proxy.Initialise()
self.proxy.On()
expected_subbands = [10, 20, 30] + [0] * (N_beamlets_ctrl - 3)
self.assertListEqual(
beamlet_proxy.subband_select_RW.tolist(), expected_subbands
)
def test_apply_pointing(self):
"""Test that attribute sap pointing is correctly applied"""
digitalbeam_proxy = self.digitalbeam_proxy
default_pointing = [("AZELGEO", "0rad", "1.570796rad")] * N_beamlets_ctrl
digitalbeam_proxy.Pointing_direction_RW = default_pointing
self.assertListEqual(
list(digitalbeam_proxy.Pointing_direction_RW), default_pointing
)
self.proxy.Initialise()
self.proxy.On()
expected_pointing = [("J2000", "0.0261799rad", "0rad")] * 3 + [
("AZELGEO", "0rad", "1.570796rad")
] * (N_beamlets_ctrl - 3)
digitalbeam_pointing = list(digitalbeam_proxy.Pointing_direction_RW[0])
digitalbeam_pointing[1] = digitalbeam_pointing[1][:9] + "rad"
digitalbeam_pointing = [tuple(digitalbeam_pointing)] * 3 + [
("AZELGEO", "0rad", "1.570796rad")
] * (N_beamlets_ctrl - 3)
self.maxDiff = None
self.assertListEqual(
digitalbeam_pointing,
expected_pointing,
)
def test_apply_tilebeam(self):
# failing
"""Test that attribute tilebeam is correctly applied"""
tilebeam_proxy = self.tilebeam_proxy
pointing_direction = [("J2000", "0rad", "0rad")] * CS001_TILES
tilebeam_proxy.Pointing_direction_RW = pointing_direction
self.assertListEqual(
list(tilebeam_proxy.Pointing_direction_RW[0]), ["J2000", "0rad", "0rad"]
)
self.proxy.Initialise()
self.proxy.On()
tilebeam_directions = list(tilebeam_proxy.Pointing_direction_RW[0])
tilebeam_directions[1] = tilebeam_directions[1][:9] + "rad"
self.assertListEqual(
tilebeam_directions,
["J2000", "0.0261799rad", "0rad"],
)
def test_apply_element_selection(self):
# failing
"""Test that attribute element_selection is correctly applied"""
antennafield_proxy = self.antennafield_proxy
antennafield_proxy.HBAT_PWR_on_RW = numpy.ones(
(CS001_TILES, N_elements * N_pol), dtype=bool
)
antennafield_proxy.HBAT_PWR_LNA_on_RW = numpy.zeros(
(CS001_TILES, N_elements * N_pol), dtype=bool
)
self.proxy.Initialise()
self.proxy.On()
# construct an array with "True" at the expected spots
expected = numpy.zeros((CS001_TILES, N_elements, N_pol), dtype=bool)
for n in range(CS001_TILES):
expected[n, n % 16, :] = True
# collapse all values for a tile into a single dimension,
# like HBAT_PWR_(LNA_)on_RW will return
expected = expected.reshape(CS001_TILES, -1)
self.assertListEqual(
antennafield_proxy.HBAT_PWR_on_RW.tolist(), expected.tolist()
)
self.assertListEqual(
antennafield_proxy.HBAT_PWR_LNA_on_RW.tolist(), expected.tolist()
)
def test_xst_settings(self):
"""Test that the XSTs are correctly configured"""
# reset XST configuration to detect changes
xst_proxy = self.xst_proxy
xst_proxy.FPGA_xst_subband_select_RW = [
[0] * (1 + MAX_PARALLEL_SUBBANDS)
] * N_pn
xst_proxy.FPGA_xst_integration_interval_RW = [10.0] * N_pn
xst_proxy.FPGA_xst_offload_nof_crosslets_RW = [0] * N_pn
self.proxy.Initialise()
self.proxy.On()
self.assertListEqual(
[5, 1, 2, 3, 4, 5, 0, 0] * N_pn,
xst_proxy.FPGA_xst_subband_select_RW.flatten().tolist(),
)
self.assertListEqual(
[1.0] * N_pn, xst_proxy.FPGA_xst_integration_interval_RW.tolist()
)
self.assertListEqual(
[5] * N_pn, xst_proxy.FPGA_xst_offload_nof_crosslets_RW.tolist()
)