Select Git revision
test_device_antennafield.py

L2SS-1471: Moved points available in both user and factory image to...
Jan David Mol authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_device_antennafield.py 14.91 KiB
# Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
import time
import numpy
from tango import DevState
from tangostationcontrol.common.constants import (
N_elements,
MAX_ANTENNA,
N_pol,
N_rcu,
N_rcu_inp,
DEFAULT_N_HBA_TILES,
CLK_200_MHZ,
N_pn,
S_pn,
)
from tangostationcontrol.common.frequency_bands import bands
from tangostationcontrol.devices.antennafield import AntennaQuality, AntennaUse
from integration_test.device_proxy import TestDeviceProxy
from integration_test.default.devices.base import AbstractTestBases
class TestAntennaFieldDevice(AbstractTestBases.TestDeviceBase):
def setUp(self):
self.stationmanager_proxy = self.setup_stationmanager_proxy()
super().setUp("STAT/AntennaField/HBA")
self.proxy.put_property(
{
"Power_to_RECV_mapping": [1, 1, 1, 0]
+ [-1] * ((DEFAULT_N_HBA_TILES * 2) - 4),
}
)
self.recv_proxy = self.setup_recv_proxy()
self.sdpfirmware_proxy = self.setup_sdpfirmware_proxy()
self.sdp_proxy = self.setup_sdp_proxy()
self.addCleanup(self.shutdown_recv)
self.addCleanup(self.shutdown_sdp)
# configure the frequencies, which allows access
# to the calibration attributes and commands
self.sdpfirmware_proxy.clock_RW = CLK_200_MHZ
self.recv_proxy.RCU_band_select_RW = [[1] * N_rcu_inp] * N_rcu
def restore_antennafield(self):
self.proxy.put_property(
{
"Power_to_RECV_mapping": [-1, -1] * DEFAULT_N_HBA_TILES,
"Control_to_RECV_mapping": [-1, -1] * DEFAULT_N_HBA_TILES,
}
)
@staticmethod
def shutdown_recv():
recv_proxy = TestDeviceProxy("STAT/RECVH/1")
recv_proxy.off()
@staticmethod
def shutdown_sdp():
sdp_proxy = TestDeviceProxy("STAT/SDP/HBA")
sdp_proxy.off()
def setup_recv_proxy(self):
# setup RECV
recv_proxy = TestDeviceProxy("STAT/RECVH/1")
recv_proxy.off()
recv_proxy.boot()
recv_proxy.set_defaults()
return recv_proxy
def setup_sdpfirmware_proxy(self):
# setup SDPFirmware
sdpfirmware_proxy = TestDeviceProxy("STAT/SDPFirmware/HBA")
sdpfirmware_proxy.off()
sdpfirmware_proxy.boot()
return sdpfirmware_proxy
def setup_sdp_proxy(self):
# setup SDP
sdp_proxy = TestDeviceProxy("STAT/SDP/HBA")
sdp_proxy.off()
sdp_proxy.boot()
return sdp_proxy
def setup_stationmanager_proxy(self):
"""Setup StationManager"""
stationmanager_proxy = TestDeviceProxy("STAT/StationManager/1")
stationmanager_proxy.off()
stationmanager_proxy.boot()
self.assertEqual(stationmanager_proxy.state(), DevState.ON)
return stationmanager_proxy
def test_ANT_mask_RW_configured_after_Antenna_Usage_Mask(self):
"""Verify if ANT_mask_RW values are correctly configured from Antenna_Usage_Mask values"""
antennafield_proxy = self.proxy
numpy.testing.assert_equal(
numpy.array([[True] * N_rcu_inp] * N_rcu), self.recv_proxy.ANT_mask_RW
)
antenna_qualities = numpy.array([AntennaQuality.OK] * MAX_ANTENNA)
antenna_use = numpy.array(
[AntennaUse.ON] + [AntennaUse.AUTO] * (MAX_ANTENNA - 1)
)
antenna_properties = {
"Antenna_Quality": antenna_qualities,
"Antenna_Use": antenna_use,
}
mapping_properties = {
"Power_to_RECV_mapping": [-1, -1] * DEFAULT_N_HBA_TILES,
# Two inputs of recv device connected, only defined for 48 inputs
# each pair is one input
"Control_to_RECV_mapping": [1, 0, 1, 1]
+ [-1, -1] * (DEFAULT_N_HBA_TILES - 2),
}
antennafield_proxy.off()
antennafield_proxy.put_property(antenna_properties)
antennafield_proxy.put_property(mapping_properties)
antennafield_proxy.boot()
antennafield_proxy.power_hardware_on()
# Verify all antennas are indicated to work
numpy.testing.assert_equal(
numpy.array([True] * MAX_ANTENNA), antennafield_proxy.Antenna_Usage_Mask_R
)
# Verify only connected inputs + Antenna_Usage_Mask_R are true
# As well as dimensions of ANT_mask_RW must match control mapping
numpy.testing.assert_equal(
numpy.array([True] * 2 + [False] * (DEFAULT_N_HBA_TILES - 2)),
antennafield_proxy.ANT_mask_RW,
)
# Verify recv proxy values unaffected as default for ANT_mask_RW is true
numpy.testing.assert_equal(
numpy.array([True] * 2 + [True] * (MAX_ANTENNA - 2)),
self.recv_proxy.ANT_mask_RW.flatten(),
)
def test_ANT_mask_RW_configured_after_Antenna_Usage_Mask_only_one_functioning_antenna(
self,
):
"""Verify if ANT_mask_RW values are correctly configured from Antenna_Usage_Mask values (only second antenna is OK)"""
antennafield_proxy = self.proxy
# Broken antennas except second
antenna_qualities = numpy.array(
[AntennaQuality.BROKEN]
+ [AntennaQuality.OK]
+ [AntennaQuality.BROKEN] * (MAX_ANTENNA - 2)
)
antenna_use = numpy.array([AntennaUse.AUTO] * MAX_ANTENNA)
antenna_properties = {
"Antenna_Quality": antenna_qualities,
"Antenna_Use": antenna_use,
}
# Configure control mapping to control all 96 inputs of recv device
mapping_properties = {
"Power_to_RECV_mapping": [-1, -1] * DEFAULT_N_HBA_TILES,
"Control_to_RECV_mapping":
# [1, 0, 1, 1, 1, 2, 1, x ... 1, 95]
numpy.array([[1, x] for x in range(0, MAX_ANTENNA)]).flatten(),
}
# Cycle device and set properties
antennafield_proxy.off()
antennafield_proxy.put_property(antenna_properties)
antennafield_proxy.put_property(mapping_properties)
antennafield_proxy.boot()
antennafield_proxy.power_hardware_on()
# Antenna_Usage_Mask_R should be false except one
numpy.testing.assert_equal(
numpy.array([False] + [True] + [False] * (MAX_ANTENNA - 2)),
antennafield_proxy.Antenna_Usage_Mask_R,
)
# device.power_hardware_on() writes Antenna_Usage_Mask_R to ANT_mask_RW
numpy.testing.assert_equal(
numpy.array([False] + [True] + [False] * (MAX_ANTENNA - 2)),
antennafield_proxy.ANT_mask_RW,
)
# ANT_mask_RW on antennafield writes to configured recv devices for all
# mapped inputs
numpy.testing.assert_equal(
numpy.array([False] + [True] + [False] * (MAX_ANTENNA - 2)),
self.recv_proxy.ANT_mask_RW.flatten(),
)
def test_antennafield_set_mapped_attribute_ignore_all(self):
"""Verify RECV device attribute unaffected by antennafield if not mapped"""
# Connect recvh/1 device but no control inputs
mapping_properties = {
"Power_to_RECV_mapping": [-1, -1] * DEFAULT_N_HBA_TILES,
"Control_to_RECV_mapping": [-1, -1] * DEFAULT_N_HBA_TILES,
}
# Cycle device an put properties
antennafield_proxy = self.proxy
antennafield_proxy.off()
antennafield_proxy.put_property(mapping_properties)
antennafield_proxy.boot()
# Set HBAT_PWR_on_RW to false on recv device and read results
self.recv_proxy.write_attribute(
"HBAT_PWR_on_RW", [[False] * N_elements * N_pol] * MAX_ANTENNA
)
current_values = self.recv_proxy.read_attribute("HBAT_PWR_on_RW").value
# write true through antennafield
antennafield_proxy.write_attribute(
"HBAT_PWR_on_RW", [[True] * N_elements * N_pol] * DEFAULT_N_HBA_TILES
)
# Test that original recv values for HBAT_PWR_on_RW match current
numpy.testing.assert_equal(
current_values, self.recv_proxy.read_attribute("HBAT_PWR_on_RW").value
)
# Verify device did not enter FAULT state
self.assertEqual(DevState.ON, antennafield_proxy.state())
def test_antennafield_set_mapped_attribute(self):
"""Verify RECV device attribute changed by antennafield if mapped inputs"""
mapping_properties = {
"Power_to_RECV_mapping": [-1, -1] * DEFAULT_N_HBA_TILES,
# Each pair is one mapping so 2 inputs are connected
"Control_to_RECV_mapping": [1, 0, 1, 1]
+ [-1, -1] * (DEFAULT_N_HBA_TILES - 2),
}
antennafield_proxy = self.proxy
antennafield_proxy.off()
antennafield_proxy.put_property(mapping_properties)
antennafield_proxy.boot()
self.recv_proxy.write_attribute(
"HBAT_PWR_on_RW", [[False] * N_elements * N_pol] * MAX_ANTENNA
)
try:
antennafield_proxy.write_attribute(
"HBAT_PWR_on_RW", [[True] * N_elements * N_pol] * DEFAULT_N_HBA_TILES
)
numpy.testing.assert_equal(
numpy.array(
[[True] * N_elements * N_pol] * 2
+ [[False] * N_elements * N_pol] * (MAX_ANTENNA - 2)
),
self.recv_proxy.read_attribute("HBAT_PWR_on_RW").value,
)
finally:
# Always restore recv again
self.recv_proxy.write_attribute(
"HBAT_PWR_on_RW", [[False] * N_elements * N_pol] * MAX_ANTENNA
)
# Verify device did not enter FAULT state
self.assertEqual(DevState.ON, antennafield_proxy.state())
def test_antennafield_set_mapped_attribute_all(self):
"""Verify RECV device attribute changed by antennafield all inputs mapped"""
mapping_properties = {
"Power_to_RECV_mapping": [-1, -1] * DEFAULT_N_HBA_TILES,
"Control_to_RECV_mapping":
# [1, 0, 1, 1, 1, 2, 1, x ... 1, 95]
numpy.array([[1, x] for x in range(0, MAX_ANTENNA)]).flatten(),
}
antennafield_proxy = self.proxy
antennafield_proxy.off()
antennafield_proxy.put_property(mapping_properties)
antennafield_proxy.boot()
self.recv_proxy.write_attribute(
"HBAT_PWR_on_RW", [[False] * N_elements * N_pol] * MAX_ANTENNA
)
try:
antennafield_proxy.write_attribute(
"HBAT_PWR_on_RW", [[True] * N_elements * N_pol] * MAX_ANTENNA
)
numpy.testing.assert_equal(
numpy.array([[True] * N_elements * N_pol] * MAX_ANTENNA),
self.recv_proxy.read_attribute("HBAT_PWR_on_RW").value,
)
finally:
# Always restore recv again
self.recv_proxy.write_attribute(
"HBAT_PWR_on_RW", [[False] * N_elements * N_pol] * MAX_ANTENNA
)
# Verify device did not enter FAULT state
self.assertEqual(DevState.ON, antennafield_proxy.state())
def test_antennafield_set_mapped_attribute_small(self):
"""Verify small RECV device attribute changed all inputs mapped"""
mapping_properties = {
"Power_to_RECV_mapping": [-1, -1] * DEFAULT_N_HBA_TILES,
"Control_to_RECV_mapping":
# [1, 0, 1, 1, 1, 2, 1, x ... 1, 95]
numpy.array([[1, x] for x in range(0, MAX_ANTENNA)]).flatten(),
}
antennafield_proxy = self.proxy
antennafield_proxy.off()
antennafield_proxy.put_property(mapping_properties)
antennafield_proxy.boot()
self.recv_proxy.write_attribute(
"RCU_band_select_RW", [[False] * N_rcu_inp] * N_rcu
)
try:
antennafield_proxy.write_attribute(
"RCU_band_select_RW", [True] * MAX_ANTENNA
)
numpy.testing.assert_equal(
numpy.array([[True] * N_rcu_inp] * N_rcu),
self.recv_proxy.read_attribute("RCU_band_select_RW").value,
)
finally:
# Always restore recv again
self.recv_proxy.write_attribute(
"RCU_band_select_RW", [[False] * N_rcu_inp] * N_rcu
)
# Verify device did not enter FAULT state
self.assertEqual(DevState.ON, antennafield_proxy.state())
def test_frequency_band(self):
# Test whether changes in frequency band propagate properly.
VALID_MODI = ["HBA_110_190", "HBA_170_230", "HBA_210_250"]
for antenna_type in ["HBA"]:
properties = {
"Antenna_Type": [antenna_type],
"Control_to_RECV_mapping": [1, 1, 1, 0]
+ [-1, -1] * (DEFAULT_N_HBA_TILES - 2),
"Antenna_to_SDP_Mapping": [0, 1, 0, 0]
+ [-1, -1] * (DEFAULT_N_HBA_TILES - 2),
}
antennafield_proxy = self.proxy
antennafield_proxy.off()
antennafield_proxy.put_property(properties)
antennafield_proxy.boot()
for band in [b for b in bands.values() if b.name in VALID_MODI]:
# clear downstream settings
self.recv_proxy.write_attribute(
"RCU_band_select_RW", [[0] * N_rcu_inp] * N_rcu
)
self.sdp_proxy.write_attribute("nyquist_zone_RW", [[0] * S_pn] * N_pn)
antennafield_proxy.write_attribute(
"Frequency_Band_RW", [band.name] * DEFAULT_N_HBA_TILES
)
# Wait for Tango attributes updating
time.sleep(1)
# test whether clock propagated correctly
numpy.testing.assert_equal(
band.clock, self.sdpfirmware_proxy.clock_RW, err_msg=f"{band.name}"
)
# test whether nyquist zone propagated correctly (for both polarisations!)
numpy.testing.assert_equal(
numpy.array(
[band.nyquist_zone, band.nyquist_zone] * 2
+ [0] * (N_pn * S_pn - 4)
),
self.sdp_proxy.read_attribute("nyquist_zone_RW").value.flatten(),
err_msg=f"{band.name}",
)
# test whether rcu filter propagated correctly
numpy.testing.assert_equal(
numpy.array(
[band.rcu_band, band.rcu_band] + [0] * (N_rcu_inp * N_rcu - 2)
),
self.recv_proxy.read_attribute(
"RCU_band_select_RW"
).value.flatten(),
err_msg=f"{band.name}",
)
# test whether reading back results in the same band for our inputs
numpy.testing.assert_equal(
numpy.array([band.name, band.name]),
antennafield_proxy.read_attribute("Frequency_Band_RW").value[:2],
err_msg=f"{band.name}",
)