Skip to content
Snippets Groups Projects
Select Git revision
  • ae0cf270fd11ef598e9a5692224e63c4aa6f5507
  • master default protected
  • L2SS-2199-apply-dab-to-xy
  • fix-durationmetric-test
  • stop-using-mesh-gateway
  • set_hba_element_power
  • test-pytango-10.0.3
  • revert-cs032-ccd-ip
  • deploy-components-parallel
  • fix-chrony-exporter
  • L2SS-2407-swap-iers-caltable-monitoring-port
  • L2SS-2357-fix-ruff
  • sync-up-with-meta-pypcc
  • stabilise-landing-page
  • all-stations-lofar2
  • v0.39.7-backports
  • Move-sdptr-to-v1.5.0
  • fix-build-ubuntu
  • tokens-in-env-files
  • fix-build
  • L2SS-2214-deploy-cdb
  • v0.52.8 protected
  • v0.52.7 protected
  • v0.55.5-r2 protected
  • v0.52.8-rc1 protected
  • v0.55.5 protected
  • v0.55.4 protected
  • 0.55.2.dev0
  • 0.55.1.dev0
  • 0.55.0.dev0
  • v0.54.0 protected
  • 0.53.2.dev0
  • 0.53.1.dev0
  • v0.52.3-r2 protected
  • remove-snmp-client
  • v0.52.3 protected
  • v0.52.3dev0 protected
  • 0.53.1dev0
  • v0.52.2-rc3 protected
  • v0.52.2-rc2 protected
  • v0.52.2-rc1 protected
41 results

test_device_antennafield.py

Blame
  • Jan David Mol's avatar
    L2SS-1471: Moved points available in both user and factory image to...
    Jan David Mol authored
    ae0cf270
    History
    Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    test_device_antennafield.py 14.91 KiB
    #  Copyright (C)  2023 ASTRON (Netherlands Institute for Radio Astronomy)
    #  SPDX-License-Identifier: Apache-2.0
    
    import time
    
    import numpy
    from tango import DevState
    
    from tangostationcontrol.common.constants import (
        N_elements,
        MAX_ANTENNA,
        N_pol,
        N_rcu,
        N_rcu_inp,
        DEFAULT_N_HBA_TILES,
        CLK_200_MHZ,
        N_pn,
        S_pn,
    )
    from tangostationcontrol.common.frequency_bands import bands
    from tangostationcontrol.devices.antennafield import AntennaQuality, AntennaUse
    
    from integration_test.device_proxy import TestDeviceProxy
    from integration_test.default.devices.base import AbstractTestBases
    
    
    class TestAntennaFieldDevice(AbstractTestBases.TestDeviceBase):
        def setUp(self):
            self.stationmanager_proxy = self.setup_stationmanager_proxy()
            super().setUp("STAT/AntennaField/HBA")
            self.proxy.put_property(
                {
                    "Power_to_RECV_mapping": [1, 1, 1, 0]
                    + [-1] * ((DEFAULT_N_HBA_TILES * 2) - 4),
                }
            )
            self.recv_proxy = self.setup_recv_proxy()
            self.sdpfirmware_proxy = self.setup_sdpfirmware_proxy()
            self.sdp_proxy = self.setup_sdp_proxy()
    
            self.addCleanup(self.shutdown_recv)
            self.addCleanup(self.shutdown_sdp)
    
            # configure the frequencies, which allows access
            # to the calibration attributes and commands
            self.sdpfirmware_proxy.clock_RW = CLK_200_MHZ
            self.recv_proxy.RCU_band_select_RW = [[1] * N_rcu_inp] * N_rcu
    
        def restore_antennafield(self):
            self.proxy.put_property(
                {
                    "Power_to_RECV_mapping": [-1, -1] * DEFAULT_N_HBA_TILES,
                    "Control_to_RECV_mapping": [-1, -1] * DEFAULT_N_HBA_TILES,
                }
            )
    
        @staticmethod
        def shutdown_recv():
            recv_proxy = TestDeviceProxy("STAT/RECVH/1")
            recv_proxy.off()
    
        @staticmethod
        def shutdown_sdp():
            sdp_proxy = TestDeviceProxy("STAT/SDP/HBA")
            sdp_proxy.off()
    
        def setup_recv_proxy(self):
            # setup RECV
            recv_proxy = TestDeviceProxy("STAT/RECVH/1")
            recv_proxy.off()
            recv_proxy.boot()
            recv_proxy.set_defaults()
            return recv_proxy
    
        def setup_sdpfirmware_proxy(self):
            # setup SDPFirmware
            sdpfirmware_proxy = TestDeviceProxy("STAT/SDPFirmware/HBA")
            sdpfirmware_proxy.off()
            sdpfirmware_proxy.boot()
            return sdpfirmware_proxy
    
        def setup_sdp_proxy(self):
            # setup SDP
            sdp_proxy = TestDeviceProxy("STAT/SDP/HBA")
            sdp_proxy.off()
            sdp_proxy.boot()
            return sdp_proxy
    
        def setup_stationmanager_proxy(self):
            """Setup StationManager"""
            stationmanager_proxy = TestDeviceProxy("STAT/StationManager/1")
            stationmanager_proxy.off()
            stationmanager_proxy.boot()
            self.assertEqual(stationmanager_proxy.state(), DevState.ON)
            return stationmanager_proxy
    
        def test_ANT_mask_RW_configured_after_Antenna_Usage_Mask(self):
            """Verify if ANT_mask_RW values are correctly configured from Antenna_Usage_Mask values"""
    
            antennafield_proxy = self.proxy
            numpy.testing.assert_equal(
                numpy.array([[True] * N_rcu_inp] * N_rcu), self.recv_proxy.ANT_mask_RW
            )
    
            antenna_qualities = numpy.array([AntennaQuality.OK] * MAX_ANTENNA)
            antenna_use = numpy.array(
                [AntennaUse.ON] + [AntennaUse.AUTO] * (MAX_ANTENNA - 1)
            )
            antenna_properties = {
                "Antenna_Quality": antenna_qualities,
                "Antenna_Use": antenna_use,
            }
            mapping_properties = {
                "Power_to_RECV_mapping": [-1, -1] * DEFAULT_N_HBA_TILES,
                # Two inputs of recv device connected, only defined for 48 inputs
                # each pair is one input
                "Control_to_RECV_mapping": [1, 0, 1, 1]
                + [-1, -1] * (DEFAULT_N_HBA_TILES - 2),
            }
            antennafield_proxy.off()
            antennafield_proxy.put_property(antenna_properties)
            antennafield_proxy.put_property(mapping_properties)
            antennafield_proxy.boot()
            antennafield_proxy.power_hardware_on()
    
            # Verify all antennas are indicated to work
            numpy.testing.assert_equal(
                numpy.array([True] * MAX_ANTENNA), antennafield_proxy.Antenna_Usage_Mask_R
            )
    
            # Verify only connected inputs + Antenna_Usage_Mask_R are true
            # As well as dimensions of ANT_mask_RW must match control mapping
            numpy.testing.assert_equal(
                numpy.array([True] * 2 + [False] * (DEFAULT_N_HBA_TILES - 2)),
                antennafield_proxy.ANT_mask_RW,
            )
    
            # Verify recv proxy values unaffected as default for ANT_mask_RW is true
            numpy.testing.assert_equal(
                numpy.array([True] * 2 + [True] * (MAX_ANTENNA - 2)),
                self.recv_proxy.ANT_mask_RW.flatten(),
            )
    
        def test_ANT_mask_RW_configured_after_Antenna_Usage_Mask_only_one_functioning_antenna(
            self,
        ):
            """Verify if ANT_mask_RW values are correctly configured from Antenna_Usage_Mask values (only second antenna is OK)"""
    
            antennafield_proxy = self.proxy
    
            # Broken antennas except second
            antenna_qualities = numpy.array(
                [AntennaQuality.BROKEN]
                + [AntennaQuality.OK]
                + [AntennaQuality.BROKEN] * (MAX_ANTENNA - 2)
            )
            antenna_use = numpy.array([AntennaUse.AUTO] * MAX_ANTENNA)
            antenna_properties = {
                "Antenna_Quality": antenna_qualities,
                "Antenna_Use": antenna_use,
            }
    
            # Configure control mapping to control all 96 inputs of recv device
            mapping_properties = {
                "Power_to_RECV_mapping": [-1, -1] * DEFAULT_N_HBA_TILES,
                "Control_to_RECV_mapping":
                # [1, 0,  1, 1,  1, 2,  1, x  ...  1, 95]
                numpy.array([[1, x] for x in range(0, MAX_ANTENNA)]).flatten(),
            }
    
            # Cycle device and set properties
            antennafield_proxy.off()
            antennafield_proxy.put_property(antenna_properties)
            antennafield_proxy.put_property(mapping_properties)
            antennafield_proxy.boot()
            antennafield_proxy.power_hardware_on()
    
            # Antenna_Usage_Mask_R should be false except one
            numpy.testing.assert_equal(
                numpy.array([False] + [True] + [False] * (MAX_ANTENNA - 2)),
                antennafield_proxy.Antenna_Usage_Mask_R,
            )
            # device.power_hardware_on() writes Antenna_Usage_Mask_R to ANT_mask_RW
            numpy.testing.assert_equal(
                numpy.array([False] + [True] + [False] * (MAX_ANTENNA - 2)),
                antennafield_proxy.ANT_mask_RW,
            )
            # ANT_mask_RW on antennafield writes to configured recv devices for all
            # mapped inputs
            numpy.testing.assert_equal(
                numpy.array([False] + [True] + [False] * (MAX_ANTENNA - 2)),
                self.recv_proxy.ANT_mask_RW.flatten(),
            )
    
        def test_antennafield_set_mapped_attribute_ignore_all(self):
            """Verify RECV device attribute unaffected by antennafield if not mapped"""
    
            # Connect recvh/1 device but no control inputs
            mapping_properties = {
                "Power_to_RECV_mapping": [-1, -1] * DEFAULT_N_HBA_TILES,
                "Control_to_RECV_mapping": [-1, -1] * DEFAULT_N_HBA_TILES,
            }
    
            # Cycle device an put properties
            antennafield_proxy = self.proxy
            antennafield_proxy.off()
            antennafield_proxy.put_property(mapping_properties)
            antennafield_proxy.boot()
    
            # Set HBAT_PWR_on_RW to false on recv device and read results
            self.recv_proxy.write_attribute(
                "HBAT_PWR_on_RW", [[False] * N_elements * N_pol] * MAX_ANTENNA
            )
            current_values = self.recv_proxy.read_attribute("HBAT_PWR_on_RW").value
    
            # write true through antennafield
            antennafield_proxy.write_attribute(
                "HBAT_PWR_on_RW", [[True] * N_elements * N_pol] * DEFAULT_N_HBA_TILES
            )
            # Test that original recv values for HBAT_PWR_on_RW match current
            numpy.testing.assert_equal(
                current_values, self.recv_proxy.read_attribute("HBAT_PWR_on_RW").value
            )
    
            # Verify device did not enter FAULT state
            self.assertEqual(DevState.ON, antennafield_proxy.state())
    
        def test_antennafield_set_mapped_attribute(self):
            """Verify RECV device attribute changed by antennafield if mapped inputs"""
    
            mapping_properties = {
                "Power_to_RECV_mapping": [-1, -1] * DEFAULT_N_HBA_TILES,
                # Each pair is one mapping so 2 inputs are connected
                "Control_to_RECV_mapping": [1, 0, 1, 1]
                + [-1, -1] * (DEFAULT_N_HBA_TILES - 2),
            }
    
            antennafield_proxy = self.proxy
            antennafield_proxy.off()
            antennafield_proxy.put_property(mapping_properties)
            antennafield_proxy.boot()
    
            self.recv_proxy.write_attribute(
                "HBAT_PWR_on_RW", [[False] * N_elements * N_pol] * MAX_ANTENNA
            )
    
            try:
                antennafield_proxy.write_attribute(
                    "HBAT_PWR_on_RW", [[True] * N_elements * N_pol] * DEFAULT_N_HBA_TILES
                )
                numpy.testing.assert_equal(
                    numpy.array(
                        [[True] * N_elements * N_pol] * 2
                        + [[False] * N_elements * N_pol] * (MAX_ANTENNA - 2)
                    ),
                    self.recv_proxy.read_attribute("HBAT_PWR_on_RW").value,
                )
            finally:
                # Always restore recv again
                self.recv_proxy.write_attribute(
                    "HBAT_PWR_on_RW", [[False] * N_elements * N_pol] * MAX_ANTENNA
                )
    
            # Verify device did not enter FAULT state
            self.assertEqual(DevState.ON, antennafield_proxy.state())
    
        def test_antennafield_set_mapped_attribute_all(self):
            """Verify RECV device attribute changed by antennafield all inputs mapped"""
    
            mapping_properties = {
                "Power_to_RECV_mapping": [-1, -1] * DEFAULT_N_HBA_TILES,
                "Control_to_RECV_mapping":
                # [1, 0, 1, 1, 1, 2, 1, x ... 1, 95]
                numpy.array([[1, x] for x in range(0, MAX_ANTENNA)]).flatten(),
            }
    
            antennafield_proxy = self.proxy
            antennafield_proxy.off()
            antennafield_proxy.put_property(mapping_properties)
            antennafield_proxy.boot()
    
            self.recv_proxy.write_attribute(
                "HBAT_PWR_on_RW", [[False] * N_elements * N_pol] * MAX_ANTENNA
            )
    
            try:
                antennafield_proxy.write_attribute(
                    "HBAT_PWR_on_RW", [[True] * N_elements * N_pol] * MAX_ANTENNA
                )
                numpy.testing.assert_equal(
                    numpy.array([[True] * N_elements * N_pol] * MAX_ANTENNA),
                    self.recv_proxy.read_attribute("HBAT_PWR_on_RW").value,
                )
            finally:
                # Always restore recv again
                self.recv_proxy.write_attribute(
                    "HBAT_PWR_on_RW", [[False] * N_elements * N_pol] * MAX_ANTENNA
                )
    
            # Verify device did not enter FAULT state
            self.assertEqual(DevState.ON, antennafield_proxy.state())
    
        def test_antennafield_set_mapped_attribute_small(self):
            """Verify small RECV device attribute changed all inputs mapped"""
    
            mapping_properties = {
                "Power_to_RECV_mapping": [-1, -1] * DEFAULT_N_HBA_TILES,
                "Control_to_RECV_mapping":
                # [1, 0, 1, 1, 1, 2, 1, x ... 1, 95]
                numpy.array([[1, x] for x in range(0, MAX_ANTENNA)]).flatten(),
            }
    
            antennafield_proxy = self.proxy
            antennafield_proxy.off()
            antennafield_proxy.put_property(mapping_properties)
            antennafield_proxy.boot()
    
            self.recv_proxy.write_attribute(
                "RCU_band_select_RW", [[False] * N_rcu_inp] * N_rcu
            )
    
            try:
                antennafield_proxy.write_attribute(
                    "RCU_band_select_RW", [True] * MAX_ANTENNA
                )
                numpy.testing.assert_equal(
                    numpy.array([[True] * N_rcu_inp] * N_rcu),
                    self.recv_proxy.read_attribute("RCU_band_select_RW").value,
                )
            finally:
                # Always restore recv again
                self.recv_proxy.write_attribute(
                    "RCU_band_select_RW", [[False] * N_rcu_inp] * N_rcu
                )
    
            # Verify device did not enter FAULT state
            self.assertEqual(DevState.ON, antennafield_proxy.state())
    
        def test_frequency_band(self):
            # Test whether changes in frequency band propagate properly.
            VALID_MODI = ["HBA_110_190", "HBA_170_230", "HBA_210_250"]
    
            for antenna_type in ["HBA"]:
                properties = {
                    "Antenna_Type": [antenna_type],
                    "Control_to_RECV_mapping": [1, 1, 1, 0]
                    + [-1, -1] * (DEFAULT_N_HBA_TILES - 2),
                    "Antenna_to_SDP_Mapping": [0, 1, 0, 0]
                    + [-1, -1] * (DEFAULT_N_HBA_TILES - 2),
                }
    
                antennafield_proxy = self.proxy
                antennafield_proxy.off()
                antennafield_proxy.put_property(properties)
                antennafield_proxy.boot()
    
                for band in [b for b in bands.values() if b.name in VALID_MODI]:
                    # clear downstream settings
                    self.recv_proxy.write_attribute(
                        "RCU_band_select_RW", [[0] * N_rcu_inp] * N_rcu
                    )
                    self.sdp_proxy.write_attribute("nyquist_zone_RW", [[0] * S_pn] * N_pn)
    
                    antennafield_proxy.write_attribute(
                        "Frequency_Band_RW", [band.name] * DEFAULT_N_HBA_TILES
                    )
                    # Wait for Tango attributes updating
                    time.sleep(1)
    
                    # test whether clock propagated correctly
                    numpy.testing.assert_equal(
                        band.clock, self.sdpfirmware_proxy.clock_RW, err_msg=f"{band.name}"
                    )
    
                    # test whether nyquist zone propagated correctly (for both polarisations!)
                    numpy.testing.assert_equal(
                        numpy.array(
                            [band.nyquist_zone, band.nyquist_zone] * 2
                            + [0] * (N_pn * S_pn - 4)
                        ),
                        self.sdp_proxy.read_attribute("nyquist_zone_RW").value.flatten(),
                        err_msg=f"{band.name}",
                    )
    
                    # test whether rcu filter propagated correctly
                    numpy.testing.assert_equal(
                        numpy.array(
                            [band.rcu_band, band.rcu_band] + [0] * (N_rcu_inp * N_rcu - 2)
                        ),
                        self.recv_proxy.read_attribute(
                            "RCU_band_select_RW"
                        ).value.flatten(),
                        err_msg=f"{band.name}",
                    )
    
                    # test whether reading back results in the same band for our inputs
                    numpy.testing.assert_equal(
                        numpy.array([band.name, band.name]),
                        antennafield_proxy.read_attribute("Frequency_Band_RW").value[:2],
                        err_msg=f"{band.name}",
                    )