Skip to content
Snippets Groups Projects
Select Git revision
  • faaa6527e8bf15d87b04d482e6c86356323b8ace
  • master default protected
  • gec-95-add-filter-skymodel
  • gec-105-lint-skymodel-filter
  • gec-95-update-api-docs
  • add-coverage-badge
  • releases/1.6
  • releases/1.5
  • rap-457-implement-ci-v2
  • test_heroku
  • action-python-publish
  • v1.7.1
  • v1.7.0
  • v1.6.2
  • v1.6.1
  • v1.6.post1
  • v1.6
  • v1.5.1
  • v1.5.0
  • v1.4.11
  • v1.4.10
  • v1.4.9
  • v1.4.8
  • v1.4.7
  • v1.4.6
  • v1.4.5
  • v1.4.4
  • v1.4.3
  • v1.4.2
  • v1.4.1
  • v1.4.0
31 results

tableio.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    test_device_calibration.py 8.12 KiB
    #  Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy)
    #  SPDX-License-Identifier: Apache-2.0
    
    import numpy
    from tango import DevState
    
    from tangostationcontrol.common.constants import (
        N_rcu,
        N_rcu_inp,
        DEFAULT_N_HBA_TILES,
        CLK_200_MHZ,
    )
    
    from integration_test.device_proxy import TestDeviceProxy
    from integration_test.default.devices.base import AbstractTestBases
    
    
    class TestCalibrationDevice(AbstractTestBases.TestDeviceBase):
        HBA_ANTENNA_NAMES = [
            "H0",
            "H1",
            "H2",
            "H3",
            "H4",
            "H5",
            "H6",
            "H7",
            "H8",
            "H9",
            "H10",
            "H11",
            "H12",
            "H13",
            "H14",
            "H15",
            "H16",
            "H17",
            "H18",
            "H19",
            "H20",
            "H21",
            "H22",
            "H23",
            "H24",
            "H25",
            "H26",
            "H27",
            "H28",
            "H29",
            "H30",
            "H31",
            "H32",
            "H33",
            "H34",
            "H35",
            "H36",
            "H37",
            "H38",
            "H39",
            "H40",
            "H41",
            "H42",
            "H43",
            "H44",
            "H45",
            "H46",
            "H47",
        ]
    
        def setUp(self):
            self.stationmanager_proxy = self.setup_stationmanager_proxy()
            super().setUp("STAT/Calibration/1")
            self.antennafield_proxy = self.setup_proxy("STAT/AntennaField/HBA0")
            self.antennafield_proxy.put_property(
                {
                    "Power_to_RECV_mapping": [1, 1, 1, 0]
                    + [-1] * ((DEFAULT_N_HBA_TILES * 2) - 4),
                    "Antenna_Sets": ["ALL"],
                    "Antenna_Set_Masks": ["1" * DEFAULT_N_HBA_TILES],
                }
            )
            self.recv_proxy = self.setup_recv_proxy()
            self.sdpfirmware_proxy = self.setup_sdpfirmware_proxy()
            self.sdp_proxy = self.setup_sdp_proxy()
    
            self.addCleanup(self.shutdown_recv)
            self.addCleanup(self.shutdown_sdp)
    
            # configure the frequencies, which allows access
            # to the calibration attributes and commands
            self.sdpfirmware_proxy.clock_RW = CLK_200_MHZ
            self.recv_proxy.RCU_band_select_RW = [[1] * N_rcu_inp] * N_rcu
    
        def restore_antennafield(self):
            self.proxy.put_property(
                {
                    "Power_to_RECV_mapping": [-1, -1] * DEFAULT_N_HBA_TILES,
                    "Control_to_RECV_mapping": [-1, -1] * DEFAULT_N_HBA_TILES,
                }
            )
    
        @staticmethod
        def shutdown_recv():
            recv_proxy = TestDeviceProxy("STAT/RECVH/H0")
            recv_proxy.off()
    
        @staticmethod
        def shutdown_sdp():
            sdp_proxy = TestDeviceProxy("STAT/SDP/HBA0")
            sdp_proxy.off()
    
        @staticmethod
        def shutdown(device: str):
            def off():
                proxy = TestDeviceProxy(device)
                proxy.off()
    
            return off
    
        def setup_recv_proxy(self):
            # setup RECV
            recv_proxy = TestDeviceProxy("STAT/RECVH/H0")
            recv_proxy.off()
            recv_proxy.boot()
            recv_proxy.set_defaults()
            return recv_proxy
    
        def setup_sdpfirmware_proxy(self):
            # setup SDP
            sdpfirmware_proxy = TestDeviceProxy("STAT/SDPFirmware/HBA0")
            sdpfirmware_proxy.off()
            sdpfirmware_proxy.boot()
            return sdpfirmware_proxy
    
        def setup_sdp_proxy(self):
            # setup SDP
            sdp_proxy = TestDeviceProxy("STAT/SDP/HBA0")
            sdp_proxy.off()
            sdp_proxy.boot()
            return sdp_proxy
    
        def setup_proxy(self, dev: str):
            # setup SDP
            proxy = TestDeviceProxy(dev)
            proxy.off()
            proxy.boot()
            self.addCleanup(self.shutdown(dev))
            return proxy
    
        def setup_stationmanager_proxy(self):
            """Setup StationManager"""
            stationmanager_proxy = TestDeviceProxy("STAT/StationManager/1")
            stationmanager_proxy.off()
            stationmanager_proxy.boot()
            self.assertEqual(stationmanager_proxy.state(), DevState.ON)
            return stationmanager_proxy
    
        def test_calibrate_recv(self):
            calibration_properties = {
                "Antenna_Type": ["HBA"],
                "Antenna_Cables": ["50m", "80m"] * (DEFAULT_N_HBA_TILES // 2),
                "Control_to_RECV_mapping":
                # [1, 0, 1, 1, 1, 2, 1, x ... 1, 47]
                numpy.array([[1, x] for x in range(0, DEFAULT_N_HBA_TILES)]).flatten(),
                "Power_to_RECV_mapping": numpy.array(
                    [[1, x + DEFAULT_N_HBA_TILES] for x in range(0, DEFAULT_N_HBA_TILES)]
                ).flatten(),
                # [1, 48, 1, 49, x ... 1, 95]
            }
    
            self.antennafield_proxy = self.setup_proxy("STAT/AntennaField/HBA0")
            self.antennafield_proxy.off()
            self.antennafield_proxy.put_property(calibration_properties)
            self.antennafield_proxy.boot()
    
            self.proxy.boot()
    
            # calibrate
            self.proxy.calibrate_recv("STAT/AntennaField/HBA0")
    
            # check the results
            rcu_attenuator_db_pwr = self.antennafield_proxy.RCU_attenuator_dB_RW[:, 0]
            rcu_attenuator_db_ctrl = self.antennafield_proxy.RCU_attenuator_dB_RW[:, 1]
    
            # gather settings
            field_attenuation = self.antennafield_proxy.Field_Attenuation_R
    
            for mapping_name, rcu_attenuator_db in [
                ("power", rcu_attenuator_db_pwr),
                ("control", rcu_attenuator_db_ctrl),
            ]:
                # values should be the same for the same cable length
                self.assertEqual(
                    1,
                    len(set(rcu_attenuator_db[0::2])),
                    msg=f"{mapping_name} - rcu_attenuator_db={rcu_attenuator_db}",
                )
                self.assertEqual(
                    1,
                    len(set(rcu_attenuator_db[1::2])),
                    msg=f"{mapping_name} - rcu_attenuator_db={rcu_attenuator_db}",
                )
                # value should be larger for the shorter cable, as those signals need damping
                self.assertGreater(
                    rcu_attenuator_db[0],
                    rcu_attenuator_db[1],
                    msg=f"{mapping_name} - rcu_attenuator_db={rcu_attenuator_db}",
                )
                # longest cable should require no damping, so only field attenuation applies
                self.assertEqual(
                    field_attenuation,
                    rcu_attenuator_db[1],
                    msg=f"{mapping_name} - rcu_attenuator_db={rcu_attenuator_db}",
                )
    
        def test_calibrate_sdp(self):
            calibration_properties = {
                "Antenna_Type": ["HBA"],
                "Antenna_Names": self.HBA_ANTENNA_NAMES,
                "Antenna_Cables": ["50m", "80m"] * (DEFAULT_N_HBA_TILES // 2),
                "Antenna_to_SDP_Mapping": [0, 1, 0, 0]
                + [-1, -1] * (DEFAULT_N_HBA_TILES - 2),
                "Control_to_RECV_mapping":
                # [1, 0, 1, 1, 1, 2, 1, x ... 1, 47]
                numpy.array([[1, x] for x in range(0, DEFAULT_N_HBA_TILES)]).flatten(),
                "Power_to_RECV_mapping": numpy.array(
                    [[1, x + DEFAULT_N_HBA_TILES] for x in range(0, DEFAULT_N_HBA_TILES)]
                ).flatten(),
                # [1, 48, 1, 49, x ... 1, 95]
            }
    
            self.antennafield_proxy = self.setup_proxy("STAT/AntennaField/HBA0")
            self.antennafield_proxy.off()
            self.antennafield_proxy.put_property(calibration_properties)
            self.antennafield_proxy.boot()
    
            self.proxy.boot()
    
            # calibrate
            self.proxy.calibrate_sdp("STAT/AntennaField/HBA0")
    
            # check the results
            # antenna #0 is on FPGA 0, input 2 and 3,
            # antenna #1 is on FPGA 0, input 0 and 1
            signal_input_samples_delay = self.sdp_proxy.FPGA_signal_input_samples_delay_RW
    
            # delays should be equal for both polarisations
            self.assertEqual(
                signal_input_samples_delay[0, 0], signal_input_samples_delay[0, 1]
            )
            self.assertEqual(
                signal_input_samples_delay[0, 2], signal_input_samples_delay[0, 3]
            )
    
            # antenna #0 is shorter, so should have a greater delay
            self.assertGreater(
                signal_input_samples_delay[0, 2],
                signal_input_samples_delay[0, 0],
                msg=f"{signal_input_samples_delay}",
            )
            # antenna #1 is longest, so should have delay 0
            self.assertEqual(0, signal_input_samples_delay[0, 0])