Skip to content
Snippets Groups Projects
Select Git revision
  • 4c78f2f8c99ea5cfec59e6efc744e191b52cc842
  • master default protected
  • L2SS-1914-fix_job_dispatch
  • TMSS-3170
  • TMSS-3167
  • TMSS-3161
  • TMSS-3158-Front-End-Only-Allow-Changing-Again
  • TMSS-3133
  • TMSS-3319-Fix-Templates
  • test-fix-deploy
  • TMSS-3134
  • TMSS-2872
  • defer-state
  • add-custom-monitoring-points
  • TMSS-3101-Front-End-Only
  • TMSS-984-choices
  • SDC-1400-Front-End-Only
  • TMSS-3079-PII
  • TMSS-2936
  • check-for-max-244-subbands
  • TMSS-2927---Front-End-Only-PXII
  • Before-Remove-TMSS
  • LOFAR-Release-4_4_318 protected
  • LOFAR-Release-4_4_317 protected
  • LOFAR-Release-4_4_316 protected
  • LOFAR-Release-4_4_315 protected
  • LOFAR-Release-4_4_314 protected
  • LOFAR-Release-4_4_313 protected
  • LOFAR-Release-4_4_312 protected
  • LOFAR-Release-4_4_311 protected
  • LOFAR-Release-4_4_310 protected
  • LOFAR-Release-4_4_309 protected
  • LOFAR-Release-4_4_308 protected
  • LOFAR-Release-4_4_307 protected
  • LOFAR-Release-4_4_306 protected
  • LOFAR-Release-4_4_304 protected
  • LOFAR-Release-4_4_303 protected
  • LOFAR-Release-4_4_302 protected
  • LOFAR-Release-4_4_301 protected
  • LOFAR-Release-4_4_300 protected
  • LOFAR-Release-4_4_299 protected
41 results

gpu_utils.cc

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    test_device_antennafield.py 14.91 KiB
    #  Copyright (C)  2023 ASTRON (Netherlands Institute for Radio Astronomy)
    #  SPDX-License-Identifier: Apache-2.0
    
    import time
    
    import numpy
    from tango import DevState
    
    from tangostationcontrol.common.constants import (
        N_elements,
        MAX_ANTENNA,
        N_pol,
        N_rcu,
        N_rcu_inp,
        DEFAULT_N_HBA_TILES,
        CLK_200_MHZ,
        N_pn,
        S_pn,
    )
    from tangostationcontrol.common.frequency_bands import bands
    from tangostationcontrol.devices.antennafield import AntennaQuality, AntennaUse
    
    from integration_test.device_proxy import TestDeviceProxy
    from integration_test.default.devices.base import AbstractTestBases
    
    
    class TestAntennaFieldDevice(AbstractTestBases.TestDeviceBase):
        def setUp(self):
            self.stationmanager_proxy = self.setup_stationmanager_proxy()
            super().setUp("STAT/AntennaField/HBA")
            self.proxy.put_property(
                {
                    "Power_to_RECV_mapping": [1, 1, 1, 0]
                    + [-1] * ((DEFAULT_N_HBA_TILES * 2) - 4),
                }
            )
            self.recv_proxy = self.setup_recv_proxy()
            self.sdpfirmware_proxy = self.setup_sdpfirmware_proxy()
            self.sdp_proxy = self.setup_sdp_proxy()
    
            self.addCleanup(self.shutdown_recv)
            self.addCleanup(self.shutdown_sdp)
    
            # configure the frequencies, which allows access
            # to the calibration attributes and commands
            self.sdpfirmware_proxy.clock_RW = CLK_200_MHZ
            self.recv_proxy.RCU_band_select_RW = [[1] * N_rcu_inp] * N_rcu
    
        def restore_antennafield(self):
            self.proxy.put_property(
                {
                    "Power_to_RECV_mapping": [-1, -1] * DEFAULT_N_HBA_TILES,
                    "Control_to_RECV_mapping": [-1, -1] * DEFAULT_N_HBA_TILES,
                }
            )
    
        @staticmethod
        def shutdown_recv():
            recv_proxy = TestDeviceProxy("STAT/RECVH/1")
            recv_proxy.off()
    
        @staticmethod
        def shutdown_sdp():
            sdp_proxy = TestDeviceProxy("STAT/SDP/HBA")
            sdp_proxy.off()
    
        def setup_recv_proxy(self):
            # setup RECV
            recv_proxy = TestDeviceProxy("STAT/RECVH/1")
            recv_proxy.off()
            recv_proxy.boot()
            recv_proxy.set_defaults()
            return recv_proxy
    
        def setup_sdpfirmware_proxy(self):
            # setup SDPFirmware
            sdpfirmware_proxy = TestDeviceProxy("STAT/SDPFirmware/HBA")
            sdpfirmware_proxy.off()
            sdpfirmware_proxy.boot()
            return sdpfirmware_proxy
    
        def setup_sdp_proxy(self):
            # setup SDP
            sdp_proxy = TestDeviceProxy("STAT/SDP/HBA")
            sdp_proxy.off()
            sdp_proxy.boot()
            return sdp_proxy
    
        def setup_stationmanager_proxy(self):
            """Setup StationManager"""
            stationmanager_proxy = TestDeviceProxy("STAT/StationManager/1")
            stationmanager_proxy.off()
            stationmanager_proxy.boot()
            self.assertEqual(stationmanager_proxy.state(), DevState.ON)
            return stationmanager_proxy
    
        def test_ANT_mask_RW_configured_after_Antenna_Usage_Mask(self):
            """Verify if ANT_mask_RW values are correctly configured from Antenna_Usage_Mask values"""
    
            antennafield_proxy = self.proxy
            numpy.testing.assert_equal(
                numpy.array([[True] * N_rcu_inp] * N_rcu), self.recv_proxy.ANT_mask_RW
            )
    
            antenna_qualities = numpy.array([AntennaQuality.OK] * MAX_ANTENNA)
            antenna_use = numpy.array(
                [AntennaUse.ON] + [AntennaUse.AUTO] * (MAX_ANTENNA - 1)
            )
            antenna_properties = {
                "Antenna_Quality": antenna_qualities,
                "Antenna_Use": antenna_use,
            }
            mapping_properties = {
                "Power_to_RECV_mapping": [-1, -1] * DEFAULT_N_HBA_TILES,
                # Two inputs of recv device connected, only defined for 48 inputs
                # each pair is one input
                "Control_to_RECV_mapping": [1, 0, 1, 1]
                + [-1, -1] * (DEFAULT_N_HBA_TILES - 2),
            }
            antennafield_proxy.off()
            antennafield_proxy.put_property(antenna_properties)
            antennafield_proxy.put_property(mapping_properties)
            antennafield_proxy.boot()
            antennafield_proxy.power_hardware_on()
    
            # Verify all antennas are indicated to work
            numpy.testing.assert_equal(
                numpy.array([True] * MAX_ANTENNA), antennafield_proxy.Antenna_Usage_Mask_R
            )
    
            # Verify only connected inputs + Antenna_Usage_Mask_R are true
            # As well as dimensions of ANT_mask_RW must match control mapping
            numpy.testing.assert_equal(
                numpy.array([True] * 2 + [False] * (DEFAULT_N_HBA_TILES - 2)),
                antennafield_proxy.ANT_mask_RW,
            )
    
            # Verify recv proxy values unaffected as default for ANT_mask_RW is true
            numpy.testing.assert_equal(
                numpy.array([True] * 2 + [True] * (MAX_ANTENNA - 2)),
                self.recv_proxy.ANT_mask_RW.flatten(),
            )
    
        def test_ANT_mask_RW_configured_after_Antenna_Usage_Mask_only_one_functioning_antenna(
            self,
        ):
            """Verify if ANT_mask_RW values are correctly configured from Antenna_Usage_Mask values (only second antenna is OK)"""
    
            antennafield_proxy = self.proxy
    
            # Broken antennas except second
            antenna_qualities = numpy.array(
                [AntennaQuality.BROKEN]
                + [AntennaQuality.OK]
                + [AntennaQuality.BROKEN] * (MAX_ANTENNA - 2)
            )
            antenna_use = numpy.array([AntennaUse.AUTO] * MAX_ANTENNA)
            antenna_properties = {
                "Antenna_Quality": antenna_qualities,
                "Antenna_Use": antenna_use,
            }
    
            # Configure control mapping to control all 96 inputs of recv device
            mapping_properties = {
                "Power_to_RECV_mapping": [-1, -1] * DEFAULT_N_HBA_TILES,
                "Control_to_RECV_mapping":
                # [1, 0,  1, 1,  1, 2,  1, x  ...  1, 95]
                numpy.array([[1, x] for x in range(0, MAX_ANTENNA)]).flatten(),
            }
    
            # Cycle device and set properties
            antennafield_proxy.off()
            antennafield_proxy.put_property(antenna_properties)
            antennafield_proxy.put_property(mapping_properties)
            antennafield_proxy.boot()
            antennafield_proxy.power_hardware_on()
    
            # Antenna_Usage_Mask_R should be false except one
            numpy.testing.assert_equal(
                numpy.array([False] + [True] + [False] * (MAX_ANTENNA - 2)),
                antennafield_proxy.Antenna_Usage_Mask_R,
            )
            # device.power_hardware_on() writes Antenna_Usage_Mask_R to ANT_mask_RW
            numpy.testing.assert_equal(
                numpy.array([False] + [True] + [False] * (MAX_ANTENNA - 2)),
                antennafield_proxy.ANT_mask_RW,
            )
            # ANT_mask_RW on antennafield writes to configured recv devices for all
            # mapped inputs
            numpy.testing.assert_equal(
                numpy.array([False] + [True] + [False] * (MAX_ANTENNA - 2)),
                self.recv_proxy.ANT_mask_RW.flatten(),
            )
    
        def test_antennafield_set_mapped_attribute_ignore_all(self):
            """Verify RECV device attribute unaffected by antennafield if not mapped"""
    
            # Connect recvh/1 device but no control inputs
            mapping_properties = {
                "Power_to_RECV_mapping": [-1, -1] * DEFAULT_N_HBA_TILES,
                "Control_to_RECV_mapping": [-1, -1] * DEFAULT_N_HBA_TILES,
            }
    
            # Cycle device an put properties
            antennafield_proxy = self.proxy
            antennafield_proxy.off()
            antennafield_proxy.put_property(mapping_properties)
            antennafield_proxy.boot()
    
            # Set HBAT_PWR_on_RW to false on recv device and read results
            self.recv_proxy.write_attribute(
                "HBAT_PWR_on_RW", [[False] * N_elements * N_pol] * MAX_ANTENNA
            )
            current_values = self.recv_proxy.read_attribute("HBAT_PWR_on_RW").value
    
            # write true through antennafield
            antennafield_proxy.write_attribute(
                "HBAT_PWR_on_RW", [[True] * N_elements * N_pol] * DEFAULT_N_HBA_TILES
            )
            # Test that original recv values for HBAT_PWR_on_RW match current
            numpy.testing.assert_equal(
                current_values, self.recv_proxy.read_attribute("HBAT_PWR_on_RW").value
            )
    
            # Verify device did not enter FAULT state
            self.assertEqual(DevState.ON, antennafield_proxy.state())
    
        def test_antennafield_set_mapped_attribute(self):
            """Verify RECV device attribute changed by antennafield if mapped inputs"""
    
            mapping_properties = {
                "Power_to_RECV_mapping": [-1, -1] * DEFAULT_N_HBA_TILES,
                # Each pair is one mapping so 2 inputs are connected
                "Control_to_RECV_mapping": [1, 0, 1, 1]
                + [-1, -1] * (DEFAULT_N_HBA_TILES - 2),
            }
    
            antennafield_proxy = self.proxy
            antennafield_proxy.off()
            antennafield_proxy.put_property(mapping_properties)
            antennafield_proxy.boot()
    
            self.recv_proxy.write_attribute(
                "HBAT_PWR_on_RW", [[False] * N_elements * N_pol] * MAX_ANTENNA
            )
    
            try:
                antennafield_proxy.write_attribute(
                    "HBAT_PWR_on_RW", [[True] * N_elements * N_pol] * DEFAULT_N_HBA_TILES
                )
                numpy.testing.assert_equal(
                    numpy.array(
                        [[True] * N_elements * N_pol] * 2
                        + [[False] * N_elements * N_pol] * (MAX_ANTENNA - 2)
                    ),
                    self.recv_proxy.read_attribute("HBAT_PWR_on_RW").value,
                )
            finally:
                # Always restore recv again
                self.recv_proxy.write_attribute(
                    "HBAT_PWR_on_RW", [[False] * N_elements * N_pol] * MAX_ANTENNA
                )
    
            # Verify device did not enter FAULT state
            self.assertEqual(DevState.ON, antennafield_proxy.state())
    
        def test_antennafield_set_mapped_attribute_all(self):
            """Verify RECV device attribute changed by antennafield all inputs mapped"""
    
            mapping_properties = {
                "Power_to_RECV_mapping": [-1, -1] * DEFAULT_N_HBA_TILES,
                "Control_to_RECV_mapping":
                # [1, 0, 1, 1, 1, 2, 1, x ... 1, 95]
                numpy.array([[1, x] for x in range(0, MAX_ANTENNA)]).flatten(),
            }
    
            antennafield_proxy = self.proxy
            antennafield_proxy.off()
            antennafield_proxy.put_property(mapping_properties)
            antennafield_proxy.boot()
    
            self.recv_proxy.write_attribute(
                "HBAT_PWR_on_RW", [[False] * N_elements * N_pol] * MAX_ANTENNA
            )
    
            try:
                antennafield_proxy.write_attribute(
                    "HBAT_PWR_on_RW", [[True] * N_elements * N_pol] * MAX_ANTENNA
                )
                numpy.testing.assert_equal(
                    numpy.array([[True] * N_elements * N_pol] * MAX_ANTENNA),
                    self.recv_proxy.read_attribute("HBAT_PWR_on_RW").value,
                )
            finally:
                # Always restore recv again
                self.recv_proxy.write_attribute(
                    "HBAT_PWR_on_RW", [[False] * N_elements * N_pol] * MAX_ANTENNA
                )
    
            # Verify device did not enter FAULT state
            self.assertEqual(DevState.ON, antennafield_proxy.state())
    
        def test_antennafield_set_mapped_attribute_small(self):
            """Verify small RECV device attribute changed all inputs mapped"""
    
            mapping_properties = {
                "Power_to_RECV_mapping": [-1, -1] * DEFAULT_N_HBA_TILES,
                "Control_to_RECV_mapping":
                # [1, 0, 1, 1, 1, 2, 1, x ... 1, 95]
                numpy.array([[1, x] for x in range(0, MAX_ANTENNA)]).flatten(),
            }
    
            antennafield_proxy = self.proxy
            antennafield_proxy.off()
            antennafield_proxy.put_property(mapping_properties)
            antennafield_proxy.boot()
    
            self.recv_proxy.write_attribute(
                "RCU_band_select_RW", [[False] * N_rcu_inp] * N_rcu
            )
    
            try:
                antennafield_proxy.write_attribute(
                    "RCU_band_select_RW", [True] * MAX_ANTENNA
                )
                numpy.testing.assert_equal(
                    numpy.array([[True] * N_rcu_inp] * N_rcu),
                    self.recv_proxy.read_attribute("RCU_band_select_RW").value,
                )
            finally:
                # Always restore recv again
                self.recv_proxy.write_attribute(
                    "RCU_band_select_RW", [[False] * N_rcu_inp] * N_rcu
                )
    
            # Verify device did not enter FAULT state
            self.assertEqual(DevState.ON, antennafield_proxy.state())
    
        def test_frequency_band(self):
            # Test whether changes in frequency band propagate properly.
            VALID_MODI = ["HBA_110_190", "HBA_170_230", "HBA_210_250"]
    
            for antenna_type in ["HBA"]:
                properties = {
                    "Antenna_Type": [antenna_type],
                    "Control_to_RECV_mapping": [1, 1, 1, 0]
                    + [-1, -1] * (DEFAULT_N_HBA_TILES - 2),
                    "Antenna_to_SDP_Mapping": [0, 1, 0, 0]
                    + [-1, -1] * (DEFAULT_N_HBA_TILES - 2),
                }
    
                antennafield_proxy = self.proxy
                antennafield_proxy.off()
                antennafield_proxy.put_property(properties)
                antennafield_proxy.boot()
    
                for band in [b for b in bands.values() if b.name in VALID_MODI]:
                    # clear downstream settings
                    self.recv_proxy.write_attribute(
                        "RCU_band_select_RW", [[0] * N_rcu_inp] * N_rcu
                    )
                    self.sdp_proxy.write_attribute("nyquist_zone_RW", [[0] * S_pn] * N_pn)
    
                    antennafield_proxy.write_attribute(
                        "Frequency_Band_RW", [band.name] * DEFAULT_N_HBA_TILES
                    )
                    # Wait for Tango attributes updating
                    time.sleep(1)
    
                    # test whether clock propagated correctly
                    numpy.testing.assert_equal(
                        band.clock, self.sdpfirmware_proxy.clock_RW, err_msg=f"{band.name}"
                    )
    
                    # test whether nyquist zone propagated correctly (for both polarisations!)
                    numpy.testing.assert_equal(
                        numpy.array(
                            [band.nyquist_zone, band.nyquist_zone] * 2
                            + [0] * (N_pn * S_pn - 4)
                        ),
                        self.sdp_proxy.read_attribute("nyquist_zone_RW").value.flatten(),
                        err_msg=f"{band.name}",
                    )
    
                    # test whether rcu filter propagated correctly
                    numpy.testing.assert_equal(
                        numpy.array(
                            [band.rcu_band, band.rcu_band] + [0] * (N_rcu_inp * N_rcu - 2)
                        ),
                        self.recv_proxy.read_attribute(
                            "RCU_band_select_RW"
                        ).value.flatten(),
                        err_msg=f"{band.name}",
                    )
    
                    # test whether reading back results in the same band for our inputs
                    numpy.testing.assert_equal(
                        numpy.array([band.name, band.name]),
                        antennafield_proxy.read_attribute("Frequency_Band_RW").value[:2],
                        err_msg=f"{band.name}",
                    )