Skip to content
Snippets Groups Projects
Select Git revision
  • 6150f1f54fc6613cc1781cbc7ccea9b0f374b4da
  • master default protected
  • L2SS-1914-fix_job_dispatch
  • TMSS-3170
  • TMSS-3167
  • TMSS-3161
  • TMSS-3158-Front-End-Only-Allow-Changing-Again
  • TMSS-3133
  • TMSS-3319-Fix-Templates
  • test-fix-deploy
  • TMSS-3134
  • TMSS-2872
  • defer-state
  • add-custom-monitoring-points
  • TMSS-3101-Front-End-Only
  • TMSS-984-choices
  • SDC-1400-Front-End-Only
  • TMSS-3079-PII
  • TMSS-2936
  • check-for-max-244-subbands
  • TMSS-2927---Front-End-Only-PXII
  • Before-Remove-TMSS
  • LOFAR-Release-4_4_318 protected
  • LOFAR-Release-4_4_317 protected
  • LOFAR-Release-4_4_316 protected
  • LOFAR-Release-4_4_315 protected
  • LOFAR-Release-4_4_314 protected
  • LOFAR-Release-4_4_313 protected
  • LOFAR-Release-4_4_312 protected
  • LOFAR-Release-4_4_311 protected
  • LOFAR-Release-4_4_310 protected
  • LOFAR-Release-4_4_309 protected
  • LOFAR-Release-4_4_308 protected
  • LOFAR-Release-4_4_307 protected
  • LOFAR-Release-4_4_306 protected
  • LOFAR-Release-4_4_304 protected
  • LOFAR-Release-4_4_303 protected
  • LOFAR-Release-4_4_302 protected
  • LOFAR-Release-4_4_301 protected
  • LOFAR-Release-4_4_300 protected
  • LOFAR-Release-4_4_299 protected
41 results

t_tmssapp_scheduling_REST_API.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    test_calibration.py 13.31 KiB
    #  Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy)
    #  SPDX-License-Identifier: Apache-2.0
    import os
    from os import path
    from unittest.mock import patch, Mock, call, PropertyMock
    from test import base
    
    import numpy
    from numpy.testing import assert_array_equal, assert_equal
    
    from tangostationcontrol.common import consul
    from tangostationcontrol.common.calibration import (
        delay_compensation,
        loss_compensation,
        dB_to_factor,
        CalibrationManager,
        CalibrationTable,
    )
    from tangostationcontrol.common.constants import S_pn, N_subbands, N_pn, SDP_UNIT_WEIGHT
    from tangostationcontrol.common.sdp import complex_to_weights, are_subbands_decreasing
    
    
    class MockMinio:
        def __init__(self, **kwargs):
            self.args = kwargs
    
    
    def new_lookup_service(*args, **kwargs):
        yield consul.Service(host="test", port=9000, addr="test")
    
    
    @patch("tangostationcontrol.common.calibration.Minio")
    @patch("tangostationcontrol.common.consul.lookup_service", new=new_lookup_service)
    @patch.dict(
        os.environ,
        {"MINIO_ROOT_USER": "my_user", "MINIO_ROOT_PASSWORD": "my_passwd"},
        clear=True,
    )
    class TestCalibrationManager(base.TestCase):
        """Test class for Calibration Manager"""
    
        def test_sync_calibration_tables(self, minio):
            """Test whether calibration table files are correctly retrieved"""
            minio.return_value.list_objects.return_value = [
                Mock(object_name="/unittest-station/file1.h5"),
                Mock(object_name="/unittest-station/file2.h5"),
                Mock(object_name="/unittest-station/file3.h5"),
            ]
            sut = CalibrationManager(
                "http://server:1234/test_bucket/test_prefix", "unittest-station"
            )
            minio.has_call_with(
                "server:1234", access_key="my_user", secret_key="my_passwd", secure=False
            )
            minio.return_value.list_objects.has_call_with(
                "test_bucket", prefix="test_prefix/unittest-station/"
            )
            minio.return_value.fget_object.assert_has_calls(
                [
                    call(
                        "test_bucket",
                        "/unittest-station/file1.h5",
                        path.join(sut._tmp_dir.name, "file1.h5"),
                    ),
                    call(
                        "test_bucket",
                        "/unittest-station/file2.h5",
                        path.join(sut._tmp_dir.name, "file2.h5"),
                    ),
                    call(
                        "test_bucket",
                        "/unittest-station/file3.h5",
                        path.join(sut._tmp_dir.name, "file3.h5"),
                    ),
                ]
            )
    
        @patch("tangostationcontrol.common.calibration.read_hdf5")
        def test_calibrate_subband_weights(self, hdf_reader, _):
            """Test whether calibration values are correctly applied to weights"""
    
            antenna_field_mock = Mock(
                Antenna_to_SDP_Mapping_R=numpy.array(
                    [[1, 1], [1, 2]], dtype=numpy.int32
                ).reshape(-1, 2),
                Antenna_Names_R=[f"T{n + 1}" for n in range(2)],
                RCU_band_select_RW=numpy.array([[1, 1], [2, 2]]),
                antenna_type_R="HBA",
                **{"name.return_value": "Stat/AFH/HBA0"},
            )
            subband_weights = numpy.array([[SDP_UNIT_WEIGHT] * S_pn * N_subbands] * N_pn)
    
            def subband_weights_side_effect(new_value=None):
                nonlocal subband_weights
                if new_value is not None:
                    subband_weights = new_value
                return subband_weights
    
            sdp_mock = Mock(
                nyquist_zone_RW=numpy.array([[0] * N_pn] * S_pn),
                FPGA_spectral_inversion_R=numpy.array([[0] * N_pn] * S_pn),
            )
            subband_property_mock = PropertyMock(side_effect=subband_weights_side_effect)
            type(sdp_mock).FPGA_subband_weights_RW = subband_property_mock
            caltable_mock = Mock(
                observation_station="unittest-station",
                antennas={
                    "T1": Mock(x=numpy.arange(0, 512), y=numpy.arange(512, 1024)),
                    "T2": Mock(x=numpy.arange(1024, 1536), y=numpy.arange(1536, 2048)),
                },
            )
            hdf_reader.return_value.__enter__.return_value = caltable_mock
    
            sut = CalibrationManager("http://server:1234", "unittest-station")
            sut.calibrate_subband_weights(antenna_field_mock, sdp_mock)
    
            hdf_reader.assert_has_calls(
                [
                    call(
                        f"{sut._tmp_dir.name}/CalTable-unittest-station-HBA-200MHz.h5",
                        CalibrationTable,
                    ),
                    call().__enter__(),
                    call().__exit__(None, None, None),
                    call(
                        f"{sut._tmp_dir.name}/CalTable-unittest-station-HBA-150MHz.h5",
                        CalibrationTable,
                    ),
                    call().__enter__(),
                    call().__exit__(None, None, None),
                ]
            )
            assert_array_equal(
                subband_weights[1, 1024:1536],
                complex_to_weights(numpy.arange(0, 512)),
            )
            assert_array_equal(
                subband_weights[1, 1536:2048],
                complex_to_weights(numpy.arange(512, 1024)),
            )
            assert_array_equal(
                subband_weights[1, 2048:2560],
                complex_to_weights(numpy.arange(1024, 1536)),
            )
            assert_array_equal(
                subband_weights[1, 2560:3072],
                complex_to_weights(numpy.arange(1536, 2048)),
            )
    
        @patch("tangostationcontrol.common.calibration.read_hdf5")
        def test_calibrate_reverse_order(self, hdf_reader, _):
            """Test whether calibration values are applied in decreasing order
            when subband frequencies are decreasing"""
            antenna_field_mock = Mock(
                Antenna_to_SDP_Mapping_R=numpy.array(
                    [[1, 1], [1, 2]], dtype=numpy.int32
                ).reshape(-1, 2),
                Antenna_Names_R=[f"T{n + 1}" for n in range(2)],
                RCU_band_select_RW=numpy.array([[1, 1], [2, 2]]),
                **{"name.return_value": "Stat/AFH/HBA0"},
            )
            subband_weights = numpy.array([[SDP_UNIT_WEIGHT] * S_pn * N_subbands] * N_pn)
    
            def subband_weights_side_effect(new_value=None):
                nonlocal subband_weights
                if new_value is not None:
                    subband_weights = new_value
                return subband_weights
    
            sdp_mock = Mock(
                nyquist_zone_RW=numpy.array(
                    [[0] * N_pn] + [[1] * N_pn] + [[0] * N_pn] * (S_pn - 2)
                ),
                FPGA_spectral_inversion_R=numpy.array([[0] * N_pn] * S_pn),
            )
            subband_property_mock = PropertyMock(side_effect=subband_weights_side_effect)
            type(sdp_mock).FPGA_subband_weights_RW = subband_property_mock
            caltable_mock = Mock(
                observation_station="unittest-station",
                antennas={
                    "T1": Mock(x=numpy.arange(0, 512), y=numpy.arange(512, 1024)),
                    "T2": Mock(x=numpy.arange(1024, 1536), y=numpy.arange(1536, 2048)),
                },
            )
            hdf_reader.return_value.__enter__.return_value = caltable_mock
    
            sut = CalibrationManager("http://server:1234", "unittest-station")
            sut.calibrate_subband_weights(antenna_field_mock, sdp_mock)
    
            hdf_reader.assert_has_calls(
                [
                    call(
                        f"{sut._tmp_dir.name}/CalTable-unittest-station-HBA-200MHz.h5",
                        CalibrationTable,
                    ),
                    call().__enter__(),
                    call().__exit__(None, None, None),
                    call(
                        f"{sut._tmp_dir.name}/CalTable-unittest-station-HBA-150MHz.h5",
                        CalibrationTable,
                    ),
                    call().__enter__(),
                    call().__exit__(None, None, None),
                ]
            )
    
            assert_equal(
                are_subbands_decreasing(
                    sdp_mock.nyquist_zone_RW[0, 0],
                    sdp_mock.FPGA_spectral_inversion_R[0, 0],
                ),
                False,
            )
            assert_equal(
                are_subbands_decreasing(
                    sdp_mock.nyquist_zone_RW[1, 0],
                    sdp_mock.FPGA_spectral_inversion_R[0, 0],
                ),
                True,
            )
    
            assert_array_equal(
                subband_weights[1, 1024:1536],
                complex_to_weights(numpy.array(range(511, -1, -1))),
            )
            assert_array_equal(
                subband_weights[1, 1536:2048],
                complex_to_weights(numpy.array(range(1023, 511, -1))),
            )
            assert_array_equal(
                subband_weights[1, 2048:2560],
                complex_to_weights(numpy.array(range(1535, 1023, -1))),
            )
            assert_array_equal(
                subband_weights[1, 2560:3072],
                complex_to_weights(numpy.array(range(2047, 1535, -1))),
            )
    
    
    class TestCalibration(base.TestCase):
        def test_dB_to_factor(self):
            # Throw some known values at it
            self.assertAlmostEqual(1.0, dB_to_factor(0.0), places=7)
            self.assertAlmostEqual(2.0, dB_to_factor(3.0), places=2)
            self.assertAlmostEqual(10.0, dB_to_factor(10.0), places=7)
    
    
    class TestLossCompensation(base.TestCase):
        def test_integer_losses_no_remainder(self):
            losses = [1.0, 2.0, 3.0, 4.0]
    
            attenuation_integer_dB, remainder_factor = loss_compensation(
                numpy.array(losses)
            )
    
            # verify that there is no remainder
            self.assertTrue(
                numpy.all(remainder_factor == 1.0),
                msg=f"attenuation_integer_dB = {attenuation_integer_dB}, remainder_factor = {remainder_factor}",
            )
    
        def test_loss_compensation_lines_up(self):
            """Test whether signals line up after the computed delay compensation."""
    
            losses = [1.0, 2.0, 3.0, 4.0]
    
            attenuation_integer_dB, _ = loss_compensation(numpy.array(losses))
    
            # sample_shift and delay_samples together should line everything up
            effective_attenuation = losses + attenuation_integer_dB
    
            # all values must be lined up equally
            self.assertEqual(
                1,
                len(set(effective_attenuation)),
                msg=f"effective_attenuation = {effective_attenuation}, attenuation_integer_dB = {attenuation_integer_dB}, losses = {losses}",
            )
    
        def test_loss_compensation_remainder(self):
            """Test correctness of the loss compensation remainders."""
    
            # losses in dB we want to compensate for. they all round to the same integer value
            losses = [0.75, 1.0, 1.25]
    
            attenuation_integer_dB, remainder_factor = loss_compensation(
                numpy.array(losses)
            )
    
            # should not result in any sample shifts
            self.assertEqual(0, attenuation_integer_dB[0])
            self.assertEqual(0, attenuation_integer_dB[1])
            self.assertEqual(0, attenuation_integer_dB[2])
    
            # remainder should correspond with differences.
            # NB: these are the factors to apply to line up the signals.
            self.assertAlmostEqual(dB_to_factor(+0.25), remainder_factor[0])
            self.assertAlmostEqual(dB_to_factor(0.0), remainder_factor[1])
            self.assertAlmostEqual(dB_to_factor(-0.25), remainder_factor[2])
    
    
    class TestDelayCompensation(base.TestCase):
        def _compute_delay_compensation(self, delays_samples: list):
            # convert to seconds (200 MHz clock => 5 ns samples)
            clock = 200_000_000
            delays_seconds = numpy.array(delays_samples) / clock
    
            # compute delay compensation
            return delay_compensation(delays_seconds, clock)
    
        def test_whole_sample_shifts_no_remainder(self):
            """Test whether delay compensation indeed has no remainder if we shift whole samples."""
    
            # delay to compensate for, in samples
            delay_samples = [1, 2, 3, 4]
    
            _, remainder_seconds = self._compute_delay_compensation(delay_samples)
    
            # verify that there is no remainder
            self.assertTrue(numpy.all(remainder_seconds == 0.0), msg=f"{remainder_seconds}")
    
        def test_sample_shifts_line_up(self):
            """Test whether signals line up after the computed delay compensation."""
    
            # delay to compensate for, in samples
            delay_samples = [1, 2, 3, 4]
    
            sample_shift, _ = self._compute_delay_compensation(delay_samples)
    
            # sample_shift and delay_samples together should line everything up
            effective_signal_delay = delay_samples + sample_shift
    
            # all values must be lined up equally
            self.assertEqual(
                1,
                len(set(effective_signal_delay)),
                msg=f"effective_signal_delay = {effective_signal_delay}, sample_shift = {sample_shift}, delay_samples = {delay_samples}",
            )
    
        def test_delay_compensation_remainder(self):
            """Test correctness of the delay compensation remainders."""
    
            # delays in samples we want to compensate for. they all round to the same sample
            delay_samples = [0.75, 1.0, 1.25]
    
            sample_shift, remainder_seconds = self._compute_delay_compensation(
                delay_samples
            )
    
            # should not result in any sample shifts
            self.assertEqual(0, sample_shift[0])
            self.assertEqual(0, sample_shift[1])
            self.assertEqual(0, sample_shift[2])
    
            # remainder should correspond with differences.
            # NB: these are the remainders to apply to line up the signals.
            self.assertAlmostEqual(+0.25, remainder_seconds[0] / 5e-9)
            self.assertAlmostEqual(0.00, remainder_seconds[1] / 5e-9)
            self.assertAlmostEqual(-0.25, remainder_seconds[2] / 5e-9)