Skip to content
Snippets Groups Projects
Select Git revision
  • 75c78bc76b65b417e2d0524dbc4b2010c9f9fa38
  • master default protected
  • L2SS-2199-apply-dab-to-xy
  • L2SS-2417-more-vector-memory
  • test-pytango-10.0.3
  • revert-cs032-ccd-ip
  • deploy-components-parallel
  • fix-chrony-exporter
  • L2SS-2407-swap-iers-caltable-monitoring-port
  • L2SS-2357-fix-ruff
  • sync-up-with-meta-pypcc
  • stabilise-landing-page
  • all-stations-lofar2
  • v0.39.7-backports
  • Move-sdptr-to-v1.5.0
  • fix-build-ubuntu
  • tokens-in-env-files
  • fix-build
  • L2SS-2214-deploy-cdb
  • fix-missing-init
  • add-power-hardware-apply
  • v0.55.5-r2 protected
  • v0.52.8-rc1 protected
  • v0.55.5 protected
  • v0.55.4 protected
  • 0.55.2.dev0
  • 0.55.1.dev0
  • 0.55.0.dev0
  • v0.54.0 protected
  • 0.53.2.dev0
  • 0.53.1.dev0
  • v0.52.3-r2 protected
  • remove-snmp-client
  • v0.52.3 protected
  • v0.52.3dev0 protected
  • 0.53.1dev0
  • v0.52.2-rc3 protected
  • v0.52.2-rc2 protected
  • v0.52.2-rc1 protected
  • v0.52.1.1 protected
  • v0.52.1 protected
41 results

wrappers.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    test_device_calibration.py 8.12 KiB
    #  Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy)
    #  SPDX-License-Identifier: Apache-2.0
    
    import numpy
    from tango import DevState
    
    from tangostationcontrol.common.constants import (
        N_rcu,
        N_rcu_inp,
        DEFAULT_N_HBA_TILES,
        CLK_200_MHZ,
    )
    
    from integration_test.device_proxy import TestDeviceProxy
    from integration_test.default.devices.base import AbstractTestBases
    
    
    class TestCalibrationDevice(AbstractTestBases.TestDeviceBase):
        HBA_ANTENNA_NAMES = [
            "H0",
            "H1",
            "H2",
            "H3",
            "H4",
            "H5",
            "H6",
            "H7",
            "H8",
            "H9",
            "H10",
            "H11",
            "H12",
            "H13",
            "H14",
            "H15",
            "H16",
            "H17",
            "H18",
            "H19",
            "H20",
            "H21",
            "H22",
            "H23",
            "H24",
            "H25",
            "H26",
            "H27",
            "H28",
            "H29",
            "H30",
            "H31",
            "H32",
            "H33",
            "H34",
            "H35",
            "H36",
            "H37",
            "H38",
            "H39",
            "H40",
            "H41",
            "H42",
            "H43",
            "H44",
            "H45",
            "H46",
            "H47",
        ]
    
        def setUp(self):
            self.stationmanager_proxy = self.setup_stationmanager_proxy()
            super().setUp("STAT/Calibration/1")
            self.antennafield_proxy = self.setup_proxy("STAT/AntennaField/HBA0")
            self.antennafield_proxy.put_property(
                {
                    "Power_to_RECV_mapping": [1, 1, 1, 0]
                    + [-1] * ((DEFAULT_N_HBA_TILES * 2) - 4),
                    "Antenna_Sets": ["ALL"],
                    "Antenna_Set_Masks": ["1" * DEFAULT_N_HBA_TILES],
                }
            )
            self.recv_proxy = self.setup_recv_proxy()
            self.sdpfirmware_proxy = self.setup_sdpfirmware_proxy()
            self.sdp_proxy = self.setup_sdp_proxy()
    
            self.addCleanup(self.shutdown_recv)
            self.addCleanup(self.shutdown_sdp)
    
            # configure the frequencies, which allows access
            # to the calibration attributes and commands
            self.sdpfirmware_proxy.clock_RW = CLK_200_MHZ
            self.recv_proxy.RCU_band_select_RW = [[1] * N_rcu_inp] * N_rcu
    
        def restore_antennafield(self):
            self.proxy.put_property(
                {
                    "Power_to_RECV_mapping": [-1, -1] * DEFAULT_N_HBA_TILES,
                    "Control_to_RECV_mapping": [-1, -1] * DEFAULT_N_HBA_TILES,
                }
            )
    
        @staticmethod
        def shutdown_recv():
            recv_proxy = TestDeviceProxy("STAT/RECVH/H0")
            recv_proxy.off()
    
        @staticmethod
        def shutdown_sdp():
            sdp_proxy = TestDeviceProxy("STAT/SDP/HBA0")
            sdp_proxy.off()
    
        @staticmethod
        def shutdown(device: str):
            def off():
                proxy = TestDeviceProxy(device)
                proxy.off()
    
            return off
    
        def setup_recv_proxy(self):
            # setup RECV
            recv_proxy = TestDeviceProxy("STAT/RECVH/H0")
            recv_proxy.off()
            recv_proxy.boot()
            recv_proxy.set_defaults()
            return recv_proxy
    
        def setup_sdpfirmware_proxy(self):
            # setup SDP
            sdpfirmware_proxy = TestDeviceProxy("STAT/SDPFirmware/HBA0")
            sdpfirmware_proxy.off()
            sdpfirmware_proxy.boot()
            return sdpfirmware_proxy
    
        def setup_sdp_proxy(self):
            # setup SDP
            sdp_proxy = TestDeviceProxy("STAT/SDP/HBA0")
            sdp_proxy.off()
            sdp_proxy.boot()
            return sdp_proxy
    
        def setup_proxy(self, dev: str):
            # setup SDP
            proxy = TestDeviceProxy(dev)
            proxy.off()
            proxy.boot()
            self.addCleanup(self.shutdown(dev))
            return proxy
    
        def setup_stationmanager_proxy(self):
            """Setup StationManager"""
            stationmanager_proxy = TestDeviceProxy("STAT/StationManager/1")
            stationmanager_proxy.off()
            stationmanager_proxy.boot()
            self.assertEqual(stationmanager_proxy.state(), DevState.ON)
            return stationmanager_proxy
    
        def test_calibrate_recv(self):
            calibration_properties = {
                "Antenna_Type": ["HBA"],
                "Antenna_Cables": ["50m", "80m"] * (DEFAULT_N_HBA_TILES // 2),
                "Control_to_RECV_mapping":
                # [1, 0, 1, 1, 1, 2, 1, x ... 1, 47]
                numpy.array([[1, x] for x in range(0, DEFAULT_N_HBA_TILES)]).flatten(),
                "Power_to_RECV_mapping": numpy.array(
                    [[1, x + DEFAULT_N_HBA_TILES] for x in range(0, DEFAULT_N_HBA_TILES)]
                ).flatten(),
                # [1, 48, 1, 49, x ... 1, 95]
            }
    
            self.antennafield_proxy = self.setup_proxy("STAT/AntennaField/HBA0")
            self.antennafield_proxy.off()
            self.antennafield_proxy.put_property(calibration_properties)
            self.antennafield_proxy.boot()
    
            self.proxy.boot()
    
            # calibrate
            self.proxy.calibrate_recv("STAT/AntennaField/HBA0")
    
            # check the results
            rcu_attenuator_db_pwr = self.antennafield_proxy.RCU_attenuator_dB_RW[:, 0]
            rcu_attenuator_db_ctrl = self.antennafield_proxy.RCU_attenuator_dB_RW[:, 1]
    
            # gather settings
            field_attenuation = self.antennafield_proxy.Field_Attenuation_R
    
            for mapping_name, rcu_attenuator_db in [
                ("power", rcu_attenuator_db_pwr),
                ("control", rcu_attenuator_db_ctrl),
            ]:
                # values should be the same for the same cable length
                self.assertEqual(
                    1,
                    len(set(rcu_attenuator_db[0::2])),
                    msg=f"{mapping_name} - rcu_attenuator_db={rcu_attenuator_db}",
                )
                self.assertEqual(
                    1,
                    len(set(rcu_attenuator_db[1::2])),
                    msg=f"{mapping_name} - rcu_attenuator_db={rcu_attenuator_db}",
                )
                # value should be larger for the shorter cable, as those signals need damping
                self.assertGreater(
                    rcu_attenuator_db[0],
                    rcu_attenuator_db[1],
                    msg=f"{mapping_name} - rcu_attenuator_db={rcu_attenuator_db}",
                )
                # longest cable should require no damping, so only field attenuation applies
                self.assertEqual(
                    field_attenuation,
                    rcu_attenuator_db[1],
                    msg=f"{mapping_name} - rcu_attenuator_db={rcu_attenuator_db}",
                )
    
        def test_calibrate_sdp(self):
            calibration_properties = {
                "Antenna_Type": ["HBA"],
                "Antenna_Names": self.HBA_ANTENNA_NAMES,
                "Antenna_Cables": ["50m", "80m"] * (DEFAULT_N_HBA_TILES // 2),
                "Antenna_to_SDP_Mapping": [0, 1, 0, 0]
                + [-1, -1] * (DEFAULT_N_HBA_TILES - 2),
                "Control_to_RECV_mapping":
                # [1, 0, 1, 1, 1, 2, 1, x ... 1, 47]
                numpy.array([[1, x] for x in range(0, DEFAULT_N_HBA_TILES)]).flatten(),
                "Power_to_RECV_mapping": numpy.array(
                    [[1, x + DEFAULT_N_HBA_TILES] for x in range(0, DEFAULT_N_HBA_TILES)]
                ).flatten(),
                # [1, 48, 1, 49, x ... 1, 95]
            }
    
            self.antennafield_proxy = self.setup_proxy("STAT/AntennaField/HBA0")
            self.antennafield_proxy.off()
            self.antennafield_proxy.put_property(calibration_properties)
            self.antennafield_proxy.boot()
    
            self.proxy.boot()
    
            # calibrate
            self.proxy.calibrate_sdp("STAT/AntennaField/HBA0")
    
            # check the results
            # antenna #0 is on FPGA 0, input 2 and 3,
            # antenna #1 is on FPGA 0, input 0 and 1
            signal_input_samples_delay = self.sdp_proxy.FPGA_signal_input_samples_delay_RW
    
            # delays should be equal for both polarisations
            self.assertEqual(
                signal_input_samples_delay[0, 0], signal_input_samples_delay[0, 1]
            )
            self.assertEqual(
                signal_input_samples_delay[0, 2], signal_input_samples_delay[0, 3]
            )
    
            # antenna #0 is shorter, so should have a greater delay
            self.assertGreater(
                signal_input_samples_delay[0, 2],
                signal_input_samples_delay[0, 0],
                msg=f"{signal_input_samples_delay}",
            )
            # antenna #1 is longest, so should have delay 0
            self.assertEqual(0, signal_input_samples_delay[0, 0])