Skip to content
Snippets Groups Projects
Select Git revision
  • b9a56a619684134f6d3f5e6ce2285e6840da00a6
  • master default protected
  • control-single-hba-and-lba
  • stabilise-landing-page
  • all-stations-lofar2
  • L2SS-2357-fix-ruff
  • v0.39.7-backports
  • Move-sdptr-to-v1.5.0
  • fix-build-ubuntu
  • tokens-in-env-files
  • fix-build
  • L2SS-2214-deploy-cdb
  • fix-missing-init
  • add-power-hardware-apply
  • L2SS-2129-Add-Subrack-Routine
  • Also-listen-internal-to-rpc
  • fix-build-dind
  • L2SS-2153--Improve-Error-Handling
  • L2SS-2153-Add-Grpc-Gateway-support
  • L2SS-1970-apsct-lol
  • DNM-pytango10.0.1rc1-test
  • remove-snmp-client
  • v0.52.3 protected
  • v0.52.3dev0 protected
  • 0.53.1dev0
  • v0.52.2-rc3 protected
  • v0.52.2-rc2 protected
  • v0.52.2-rc1 protected
  • v0.52.1.1 protected
  • v0.52.1 protected
  • v0.52.1-rc1 protected
  • v0.51.9-6 protected
  • v0.51.9-5 protected
  • v0.51.9-4 protected
  • v0.51.9-3 protected
  • v0.51.9-2 protected
  • v0.51.9-1 protected
  • v0.51.9 protected
  • v0.51.8 protected
  • v0.39.15-wsrttwo protected
  • v0.39.15-wsrt protected
41 results

test_calibration.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    test_calibration.py 11.37 KiB
    #  Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy)
    #  SPDX-License-Identifier: Apache-2.0
    import os
    from os import path
    from unittest.mock import patch, Mock, call, PropertyMock
    from test import base
    
    import numpy
    from numpy.testing import assert_array_equal
    
    from tangostationcontrol.common import consul
    from tangostationcontrol.common.calibration import (
        delay_compensation,
        loss_compensation,
        dB_to_factor,
        CalibrationManager,
        CalibrationTable,
    )
    from tangostationcontrol.common.constants import S_pn, N_subbands, N_pn, SDP_UNIT_WEIGHT
    from tangostationcontrol.common.sdp import complex_to_weights
    
    
    class MockMinio:
        def __init__(self, **kwargs):
            self.args = kwargs
    
    
    def new_lookup_service(*args, **kwargs):
        yield consul.Service(host="test", port=9000, addr="test")
    
    
    @patch("tangostationcontrol.common.calibration.Minio")
    @patch("tangostationcontrol.common.consul.lookup_service", new=new_lookup_service)
    @patch.dict(
        os.environ,
        {"MINIO_ROOT_USER": "my_user", "MINIO_ROOT_PASSWORD": "my_passwd"},
        clear=True,
    )
    class TestCalibrationManager(base.TestCase):
        """Test class for Calibration Manager"""
    
        def test_sync_calibration_tables(self, minio):
            """Test whether calibration table files are correctly retrieved"""
            minio.return_value.list_objects.return_value = [
                Mock(object_name="/unittest-station/file1.h5"),
                Mock(object_name="/unittest-station/file2.h5"),
                Mock(object_name="/unittest-station/file3.h5"),
            ]
            sut = CalibrationManager(
                "http://server:1234/test_bucket/test_prefix", "unittest-station"
            )
            minio.has_call_with(
                "server:1234", access_key="my_user", secret_key="my_passwd", secure=False
            )
            minio.return_value.list_objects.has_call_with(
                "test_bucket", prefix="test_prefix/unittest-station/"
            )
            minio.return_value.fget_object.assert_has_calls(
                [
                    call(
                        "test_bucket",
                        "/unittest-station/file1.h5",
                        path.join(sut._tmp_dir.name, "file1.h5"),
                    ),
                    call(
                        "test_bucket",
                        "/unittest-station/file2.h5",
                        path.join(sut._tmp_dir.name, "file2.h5"),
                    ),
                    call(
                        "test_bucket",
                        "/unittest-station/file3.h5",
                        path.join(sut._tmp_dir.name, "file3.h5"),
                    ),
                ]
            )
    
        def _setup_mock_antennafield(self):
            return Mock(
                Antenna_to_SDP_Mapping_R=numpy.array(
                    [[1, 1], [1, 2]], dtype=numpy.int32
                ).reshape(-1, 2),
                Antenna_Names_R=[f"T{n + 1}" for n in range(2)],
                RCU_band_select_RW=numpy.array([[1, 1], [2, 2]]),
                antenna_type_R="HBA",
                **{"name.return_value": "Stat/AFH/HBA0"},
            )
    
        def _setup_mock_caltable(self):
            return Mock(
                observation_station="unittest-station",
                antennas={
                    "T1": Mock(x=numpy.arange(0, 512), y=numpy.arange(512, 1024)),
                    "T2": Mock(x=numpy.arange(1024, 1536), y=numpy.arange(1536, 2048)),
                },
            )
    
        def _test_calibration(self, nyquist_zone, hdf_reader):
            """Common logic of calibration tests"""
            antenna_field_mock = self._setup_mock_antennafield()
            subband_weights = numpy.array([[SDP_UNIT_WEIGHT] * S_pn * N_subbands] * N_pn)
    
            def subband_weights_side_effect(new_value=None):
                nonlocal subband_weights
                if new_value is not None:
                    subband_weights = new_value
                return subband_weights
    
            sdp_mock = Mock(
                nyquist_zone_RW=nyquist_zone,
                FPGA_spectral_inversion_R=numpy.array([[0] * N_pn] * S_pn),
            )
            subband_property_mock = PropertyMock(side_effect=subband_weights_side_effect)
            type(sdp_mock).FPGA_subband_weights_RW = subband_property_mock
            caltable_mock = self._setup_mock_caltable()
            hdf_reader.return_value.__enter__.return_value = caltable_mock
    
            sut = CalibrationManager("http://server:1234", "unittest-station")
            sut.calibrate_subband_weights(antenna_field_mock, sdp_mock)
            hdf_reader.assert_has_calls(
                [
                    call(
                        f"{sut._tmp_dir.name}/CalTable-unittest-station-HBA-200MHz.h5",
                        CalibrationTable,
                    ),
                    call().__enter__(),
                    call().__exit__(None, None, None),
                    call(
                        f"{sut._tmp_dir.name}/CalTable-unittest-station-HBA-150MHz.h5",
                        CalibrationTable,
                    ),
                    call().__enter__(),
                    call().__exit__(None, None, None),
                ]
            )
            return subband_weights
    
        @patch("tangostationcontrol.common.calibration.read_hdf5")
        def test_calibrate_subband_weights(self, hdf_reader, _):
            """Test whether calibration values are correctly applied to weights"""
            nyquist_zone = numpy.array([[0] * N_pn] * S_pn)
            subband_weights = self._test_calibration(nyquist_zone, hdf_reader)
    
            assert_array_equal(
                subband_weights[1, 1024:1536],
                complex_to_weights(numpy.arange(0, 512)),
            )
            assert_array_equal(
                subband_weights[1, 1536:2048],
                complex_to_weights(numpy.arange(512, 1024)),
            )
            assert_array_equal(
                subband_weights[1, 2048:2560],
                complex_to_weights(numpy.arange(1024, 1536)),
            )
            assert_array_equal(
                subband_weights[1, 2560:3072],
                complex_to_weights(numpy.arange(1536, 2048)),
            )
    
        @patch("tangostationcontrol.common.calibration.read_hdf5")
        def test_calibrate_reverse_order(self, hdf_reader, _):
            """Test whether calibration values are applied in decreasing order
            when subband frequencies are decreasing"""
            nyquist_zone = numpy.array(
                [[0] * N_pn] + [[1] * N_pn] + [[0] * N_pn] * (S_pn - 2)
            )
            subband_weights = self._test_calibration(nyquist_zone, hdf_reader)
    
            assert_array_equal(
                subband_weights[1, 1024:1536],
                complex_to_weights(numpy.array(range(511, -1, -1))),
            )
            assert_array_equal(
                subband_weights[1, 1536:2048],
                complex_to_weights(numpy.array(range(1023, 511, -1))),
            )
            assert_array_equal(
                subband_weights[1, 2048:2560],
                complex_to_weights(numpy.array(range(1535, 1023, -1))),
            )
            assert_array_equal(
                subband_weights[1, 2560:3072],
                complex_to_weights(numpy.array(range(2047, 1535, -1))),
            )
    
    
    class TestCalibration(base.TestCase):
        def test_dB_to_factor(self):
            # Throw some known values at it
            self.assertAlmostEqual(1.0, dB_to_factor(0.0), places=7)
            self.assertAlmostEqual(2.0, dB_to_factor(3.0), places=2)
            self.assertAlmostEqual(10.0, dB_to_factor(10.0), places=7)
    
    
    class TestLossCompensation(base.TestCase):
        def test_integer_losses_no_remainder(self):
            losses = [1.0, 2.0, 3.0, 4.0]
    
            attenuation_integer_dB, remainder_factor = loss_compensation(
                numpy.array(losses)
            )
    
            # verify that there is no remainder
            self.assertTrue(
                numpy.all(remainder_factor == 1.0),
                msg=f"attenuation_integer_dB = {attenuation_integer_dB}, remainder_factor = {remainder_factor}",
            )
    
        def test_loss_compensation_lines_up(self):
            """Test whether signals line up after the computed delay compensation."""
    
            losses = [1.0, 2.0, 3.0, 4.0]
    
            attenuation_integer_dB, _ = loss_compensation(numpy.array(losses))
    
            # sample_shift and delay_samples together should line everything up
            effective_attenuation = losses + attenuation_integer_dB
    
            # all values must be lined up equally
            self.assertEqual(
                1,
                len(set(effective_attenuation)),
                msg=f"effective_attenuation = {effective_attenuation}, attenuation_integer_dB = {attenuation_integer_dB}, losses = {losses}",
            )
    
        def test_loss_compensation_remainder(self):
            """Test correctness of the loss compensation remainders."""
    
            # losses in dB we want to compensate for. they all round to the same integer value
            losses = [0.75, 1.0, 1.25]
    
            attenuation_integer_dB, remainder_factor = loss_compensation(
                numpy.array(losses)
            )
    
            # should not result in any sample shifts
            self.assertEqual(0, attenuation_integer_dB[0])
            self.assertEqual(0, attenuation_integer_dB[1])
            self.assertEqual(0, attenuation_integer_dB[2])
    
            # remainder should correspond with differences.
            # NB: these are the factors to apply to line up the signals.
            self.assertAlmostEqual(dB_to_factor(+0.25), remainder_factor[0])
            self.assertAlmostEqual(dB_to_factor(0.0), remainder_factor[1])
            self.assertAlmostEqual(dB_to_factor(-0.25), remainder_factor[2])
    
    
    class TestDelayCompensation(base.TestCase):
        def _compute_delay_compensation(self, delays_samples: list):
            # convert to seconds (200 MHz clock => 5 ns samples)
            clock = 200_000_000
            delays_seconds = numpy.array(delays_samples) / clock
    
            # compute delay compensation
            return delay_compensation(delays_seconds, clock)
    
        def test_whole_sample_shifts_no_remainder(self):
            """Test whether delay compensation indeed has no remainder if we shift whole samples."""
    
            # delay to compensate for, in samples
            delay_samples = [1, 2, 3, 4]
    
            _, remainder_seconds = self._compute_delay_compensation(delay_samples)
    
            # verify that there is no remainder
            self.assertTrue(numpy.all(remainder_seconds == 0.0), msg=f"{remainder_seconds}")
    
        def test_sample_shifts_line_up(self):
            """Test whether signals line up after the computed delay compensation."""
    
            # delay to compensate for, in samples
            delay_samples = [1, 2, 3, 4]
    
            sample_shift, _ = self._compute_delay_compensation(delay_samples)
    
            # sample_shift and delay_samples together should line everything up
            effective_signal_delay = delay_samples + sample_shift
    
            # all values must be lined up equally
            self.assertEqual(
                1,
                len(set(effective_signal_delay)),
                msg=f"effective_signal_delay = {effective_signal_delay}, sample_shift = {sample_shift}, delay_samples = {delay_samples}",
            )
    
        def test_delay_compensation_remainder(self):
            """Test correctness of the delay compensation remainders."""
    
            # delays in samples we want to compensate for. they all round to the same sample
            delay_samples = [0.75, 1.0, 1.25]
    
            sample_shift, remainder_seconds = self._compute_delay_compensation(
                delay_samples
            )
    
            # should not result in any sample shifts
            self.assertEqual(0, sample_shift[0])
            self.assertEqual(0, sample_shift[1])
            self.assertEqual(0, sample_shift[2])
    
            # remainder should correspond with differences.
            # NB: these are the remainders to apply to line up the signals.
            self.assertAlmostEqual(+0.25, remainder_seconds[0] / 5e-9)
            self.assertAlmostEqual(0.00, remainder_seconds[1] / 5e-9)
            self.assertAlmostEqual(-0.25, remainder_seconds[2] / 5e-9)