Select Git revision
combinatorialthresholder.h
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_calibration.py 11.37 KiB
# Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
import os
from os import path
from unittest.mock import patch, Mock, call, PropertyMock
from test import base
import numpy
from numpy.testing import assert_array_equal
from tangostationcontrol.common import consul
from tangostationcontrol.common.calibration import (
delay_compensation,
loss_compensation,
dB_to_factor,
CalibrationManager,
CalibrationTable,
)
from tangostationcontrol.common.constants import S_pn, N_subbands, N_pn, SDP_UNIT_WEIGHT
from tangostationcontrol.common.sdp import complex_to_weights
class MockMinio:
def __init__(self, **kwargs):
self.args = kwargs
def new_lookup_service(*args, **kwargs):
yield consul.Service(host="test", port=9000, addr="test")
@patch("tangostationcontrol.common.calibration.Minio")
@patch("tangostationcontrol.common.consul.lookup_service", new=new_lookup_service)
@patch.dict(
os.environ,
{"MINIO_ROOT_USER": "my_user", "MINIO_ROOT_PASSWORD": "my_passwd"},
clear=True,
)
class TestCalibrationManager(base.TestCase):
"""Test class for Calibration Manager"""
def test_sync_calibration_tables(self, minio):
"""Test whether calibration table files are correctly retrieved"""
minio.return_value.list_objects.return_value = [
Mock(object_name="/unittest-station/file1.h5"),
Mock(object_name="/unittest-station/file2.h5"),
Mock(object_name="/unittest-station/file3.h5"),
]
sut = CalibrationManager(
"http://server:1234/test_bucket/test_prefix", "unittest-station"
)
minio.has_call_with(
"server:1234", access_key="my_user", secret_key="my_passwd", secure=False
)
minio.return_value.list_objects.has_call_with(
"test_bucket", prefix="test_prefix/unittest-station/"
)
minio.return_value.fget_object.assert_has_calls(
[
call(
"test_bucket",
"/unittest-station/file1.h5",
path.join(sut._tmp_dir.name, "file1.h5"),
),
call(
"test_bucket",
"/unittest-station/file2.h5",
path.join(sut._tmp_dir.name, "file2.h5"),
),
call(
"test_bucket",
"/unittest-station/file3.h5",
path.join(sut._tmp_dir.name, "file3.h5"),
),
]
)
def _setup_mock_antennafield(self):
return Mock(
Antenna_to_SDP_Mapping_R=numpy.array(
[[1, 1], [1, 2]], dtype=numpy.int32
).reshape(-1, 2),
Antenna_Names_R=[f"T{n + 1}" for n in range(2)],
RCU_band_select_RW=numpy.array([[1, 1], [2, 2]]),
antenna_type_R="HBA",
**{"name.return_value": "Stat/AFH/HBA0"},
)
def _setup_mock_caltable(self):
return Mock(
observation_station="unittest-station",
antennas={
"T1": Mock(x=numpy.arange(0, 512), y=numpy.arange(512, 1024)),
"T2": Mock(x=numpy.arange(1024, 1536), y=numpy.arange(1536, 2048)),
},
)
def _test_calibration(self, nyquist_zone, hdf_reader):
"""Common logic of calibration tests"""
antenna_field_mock = self._setup_mock_antennafield()
subband_weights = numpy.array([[SDP_UNIT_WEIGHT] * S_pn * N_subbands] * N_pn)
def subband_weights_side_effect(new_value=None):
nonlocal subband_weights
if new_value is not None:
subband_weights = new_value
return subband_weights
sdp_mock = Mock(
nyquist_zone_RW=nyquist_zone,
FPGA_spectral_inversion_R=numpy.array([[0] * N_pn] * S_pn),
)
subband_property_mock = PropertyMock(side_effect=subband_weights_side_effect)
type(sdp_mock).FPGA_subband_weights_RW = subband_property_mock
caltable_mock = self._setup_mock_caltable()
hdf_reader.return_value.__enter__.return_value = caltable_mock
sut = CalibrationManager("http://server:1234", "unittest-station")
sut.calibrate_subband_weights(antenna_field_mock, sdp_mock)
hdf_reader.assert_has_calls(
[
call(
f"{sut._tmp_dir.name}/CalTable-unittest-station-HBA-200MHz.h5",
CalibrationTable,
),
call().__enter__(),
call().__exit__(None, None, None),
call(
f"{sut._tmp_dir.name}/CalTable-unittest-station-HBA-150MHz.h5",
CalibrationTable,
),
call().__enter__(),
call().__exit__(None, None, None),
]
)
return subband_weights
@patch("tangostationcontrol.common.calibration.read_hdf5")
def test_calibrate_subband_weights(self, hdf_reader, _):
"""Test whether calibration values are correctly applied to weights"""
nyquist_zone = numpy.array([[0] * N_pn] * S_pn)
subband_weights = self._test_calibration(nyquist_zone, hdf_reader)
assert_array_equal(
subband_weights[1, 1024:1536],
complex_to_weights(numpy.arange(0, 512)),
)
assert_array_equal(
subband_weights[1, 1536:2048],
complex_to_weights(numpy.arange(512, 1024)),
)
assert_array_equal(
subband_weights[1, 2048:2560],
complex_to_weights(numpy.arange(1024, 1536)),
)
assert_array_equal(
subband_weights[1, 2560:3072],
complex_to_weights(numpy.arange(1536, 2048)),
)
@patch("tangostationcontrol.common.calibration.read_hdf5")
def test_calibrate_reverse_order(self, hdf_reader, _):
"""Test whether calibration values are applied in decreasing order
when subband frequencies are decreasing"""
nyquist_zone = numpy.array(
[[0] * N_pn] + [[1] * N_pn] + [[0] * N_pn] * (S_pn - 2)
)
subband_weights = self._test_calibration(nyquist_zone, hdf_reader)
assert_array_equal(
subband_weights[1, 1024:1536],
complex_to_weights(numpy.array(range(511, -1, -1))),
)
assert_array_equal(
subband_weights[1, 1536:2048],
complex_to_weights(numpy.array(range(1023, 511, -1))),
)
assert_array_equal(
subband_weights[1, 2048:2560],
complex_to_weights(numpy.array(range(1535, 1023, -1))),
)
assert_array_equal(
subband_weights[1, 2560:3072],
complex_to_weights(numpy.array(range(2047, 1535, -1))),
)
class TestCalibration(base.TestCase):
def test_dB_to_factor(self):
# Throw some known values at it
self.assertAlmostEqual(1.0, dB_to_factor(0.0), places=7)
self.assertAlmostEqual(2.0, dB_to_factor(3.0), places=2)
self.assertAlmostEqual(10.0, dB_to_factor(10.0), places=7)
class TestLossCompensation(base.TestCase):
def test_integer_losses_no_remainder(self):
losses = [1.0, 2.0, 3.0, 4.0]
attenuation_integer_dB, remainder_factor = loss_compensation(
numpy.array(losses)
)
# verify that there is no remainder
self.assertTrue(
numpy.all(remainder_factor == 1.0),
msg=f"attenuation_integer_dB = {attenuation_integer_dB}, remainder_factor = {remainder_factor}",
)
def test_loss_compensation_lines_up(self):
"""Test whether signals line up after the computed delay compensation."""
losses = [1.0, 2.0, 3.0, 4.0]
attenuation_integer_dB, _ = loss_compensation(numpy.array(losses))
# sample_shift and delay_samples together should line everything up
effective_attenuation = losses + attenuation_integer_dB
# all values must be lined up equally
self.assertEqual(
1,
len(set(effective_attenuation)),
msg=f"effective_attenuation = {effective_attenuation}, attenuation_integer_dB = {attenuation_integer_dB}, losses = {losses}",
)
def test_loss_compensation_remainder(self):
"""Test correctness of the loss compensation remainders."""
# losses in dB we want to compensate for. they all round to the same integer value
losses = [0.75, 1.0, 1.25]
attenuation_integer_dB, remainder_factor = loss_compensation(
numpy.array(losses)
)
# should not result in any sample shifts
self.assertEqual(0, attenuation_integer_dB[0])
self.assertEqual(0, attenuation_integer_dB[1])
self.assertEqual(0, attenuation_integer_dB[2])
# remainder should correspond with differences.
# NB: these are the factors to apply to line up the signals.
self.assertAlmostEqual(dB_to_factor(+0.25), remainder_factor[0])
self.assertAlmostEqual(dB_to_factor(0.0), remainder_factor[1])
self.assertAlmostEqual(dB_to_factor(-0.25), remainder_factor[2])
class TestDelayCompensation(base.TestCase):
def _compute_delay_compensation(self, delays_samples: list):
# convert to seconds (200 MHz clock => 5 ns samples)
clock = 200_000_000
delays_seconds = numpy.array(delays_samples) / clock
# compute delay compensation
return delay_compensation(delays_seconds, clock)
def test_whole_sample_shifts_no_remainder(self):
"""Test whether delay compensation indeed has no remainder if we shift whole samples."""
# delay to compensate for, in samples
delay_samples = [1, 2, 3, 4]
_, remainder_seconds = self._compute_delay_compensation(delay_samples)
# verify that there is no remainder
self.assertTrue(numpy.all(remainder_seconds == 0.0), msg=f"{remainder_seconds}")
def test_sample_shifts_line_up(self):
"""Test whether signals line up after the computed delay compensation."""
# delay to compensate for, in samples
delay_samples = [1, 2, 3, 4]
sample_shift, _ = self._compute_delay_compensation(delay_samples)
# sample_shift and delay_samples together should line everything up
effective_signal_delay = delay_samples + sample_shift
# all values must be lined up equally
self.assertEqual(
1,
len(set(effective_signal_delay)),
msg=f"effective_signal_delay = {effective_signal_delay}, sample_shift = {sample_shift}, delay_samples = {delay_samples}",
)
def test_delay_compensation_remainder(self):
"""Test correctness of the delay compensation remainders."""
# delays in samples we want to compensate for. they all round to the same sample
delay_samples = [0.75, 1.0, 1.25]
sample_shift, remainder_seconds = self._compute_delay_compensation(
delay_samples
)
# should not result in any sample shifts
self.assertEqual(0, sample_shift[0])
self.assertEqual(0, sample_shift[1])
self.assertEqual(0, sample_shift[2])
# remainder should correspond with differences.
# NB: these are the remainders to apply to line up the signals.
self.assertAlmostEqual(+0.25, remainder_seconds[0] / 5e-9)
self.assertAlmostEqual(0.00, remainder_seconds[1] / 5e-9)
self.assertAlmostEqual(-0.25, remainder_seconds[2] / 5e-9)