Skip to content
Snippets Groups Projects
Commit d3e692e1 authored by Jan David Mol's avatar Jan David Mol
Browse files

Merge branch 'L2SS-2017-fix-samples-delay-comp' into 'master'

L2SS-2017: Fix rounding of input samples delay

Closes L2SS-2017

See merge request !996
parents 5c1276b8 293b40e0
No related branches found
No related tags found
1 merge request!996L2SS-2017: Fix rounding of input samples delay
......@@ -151,6 +151,7 @@ Next change the version in the following places:
# Release Notes
* 0.43.1 Fix rounding for the coarse delay and loss compensations.
* 0.43.0 Use PyTango 10.0.0
* 0.42.12 Calibration device now fetches the station's name from the StationManager
* 0.42.11 Fix HBA inner antenna mask for remote stations
......
0.43.0
0.43.1
......@@ -218,7 +218,7 @@ class CalibrationManager:
)
def delay_compensation(delays_seconds: numpy.ndarray, clock: int):
def delay_compensation(delays_seconds: numpy.ndarray, clock: int) -> numpy.ndarray:
"""Return the delay compensation required to line up
signals that are delayed by "delays" seconds. The returned values
are the delay to apply, in samples (coarse) and remaining seconds
......@@ -238,8 +238,8 @@ def delay_compensation(delays_seconds: numpy.ndarray, clock: int):
# chain, while input_* are the amount of (negative) delay to apply
# to compensate.
# compute the coarse correction, in samples
signal_delays_samples = numpy.round(delays_seconds * clock).astype(numpy.uint32)
# compute the correction, in samples
signal_delays_samples = delays_seconds * clock
# correct for the coarse delay by delaying the other signals to line up
# we cannot configure a negative number of samples, so we must delay
......@@ -247,18 +247,11 @@ def delay_compensation(delays_seconds: numpy.ndarray, clock: int):
#
# This introduces a constant shift in timing for all samples,
# as we shift all of them to obtain a non-negative delay.
input_delays_samples = max(signal_delays_samples) - signal_delays_samples
input_delays_samples = numpy.round(
max(signal_delays_samples) - signal_delays_samples
).astype(numpy.int32)
# compute the remainder, in seconds
signal_delays_subsample_seconds = delays_seconds - signal_delays_samples / clock
input_delays_subsample_seconds = -signal_delays_subsample_seconds
return input_delays_samples, input_delays_subsample_seconds
def dB_to_factor(dB: numpy.ndarray) -> numpy.ndarray:
"""Convert values in decibel (dB) into their equivalent scaling factors."""
return 10 ** (dB / 10)
return input_delays_samples
def calibrate_input_samples_delay(
......@@ -278,7 +271,7 @@ def calibrate_input_samples_delay(
# compute the required compensation
clock = sdpfirmware.clock_RW
input_samples_delay, _ = delay_compensation(signal_delay_seconds, clock)
input_samples_delay = delay_compensation(signal_delay_seconds, clock)
# read-modify-write on [fpga][(input, polarisation)]
fpga_signal_input_samples_delay = sdp.FPGA_signal_input_samples_delay_RW
......@@ -315,7 +308,7 @@ def calibrate_RCU_attenuator_dB(antenna_field: DeviceProxy):
# return coarse attenuation to apply (weakest signal
# gets 0 attenuation).
rcu_attenuator_db, _ = loss_compensation(signal_delay_loss)
rcu_attenuator_db = loss_compensation(signal_delay_loss)
# add field-wide attenuation
rcu_attenuator_db += antenna_field.Field_Attenuation_R
......@@ -324,36 +317,23 @@ def calibrate_RCU_attenuator_dB(antenna_field: DeviceProxy):
antenna_field.RCU_attenuator_dB_RW = rcu_attenuator_db.astype(numpy.int64)
def loss_compensation(losses_dB: numpy.ndarray):
def loss_compensation(losses_dB: numpy.ndarray) -> numpy.ndarray:
"""Return the attenuation required to line up
signals that are dampened by "lossed_dB" decibel.
Returned are the signal attenuations in whole dBs (coarse), and
the remaining scaling (as a factor), as a tuple (whole dBs, remainder).
Returned are the signal attenuations in whole dBs (coarse).
The coarse attenuation is to be applied in recv.RCU_attenuation_dB_RW,
the fine scaling is to be incorporated into sdp.FPGA_subband_weights_RW.
the remainder is to be incorporated into sdp.FPGA_subband_weights_RW.
Applying this correction equalises the signal across the inputs
to be dampened max(round(losses_dB)) instead of their value
in losses_dB. So we do _not_ fully dampen towards the weakest signal.
"""
# NB: signal_* are the amount of loss the signal obtained in our processing
# chain, while input_* are the amount of (dampening) attenuation to apply
# to compensate.
# compute the coarse correction, in samples
signal_attenuation_integer_dB = numpy.round(losses_dB).astype(numpy.uint32)
# correct for the coarse loss by dampening the signals to line up.
input_attenuation_integer_dB = (
numpy.max(signal_attenuation_integer_dB) - signal_attenuation_integer_dB
input_attenuation_integer_dB = numpy.round(numpy.max(losses_dB) - losses_dB).astype(
numpy.int32
)
# compute the remainder, as a scaling factor
signal_loss_remainder_dB = losses_dB - signal_attenuation_integer_dB
input_attenuation_remainder_dB = -signal_loss_remainder_dB
input_attenuation_remainder_factor = dB_to_factor(input_attenuation_remainder_dB)
return (input_attenuation_integer_dB, input_attenuation_remainder_factor)
return input_attenuation_integer_dB
......@@ -12,7 +12,6 @@ from tangostationcontrol.common import consul
from tangostationcontrol.common.calibration import (
delay_compensation,
loss_compensation,
dB_to_factor,
CalibrationManager,
CalibrationTable,
)
......@@ -187,34 +186,13 @@ class TestCalibrationManager(base.TestCase):
)
class TestCalibration(base.TestCase):
def test_dB_to_factor(self):
# Throw some known values at it
self.assertAlmostEqual(1.0, dB_to_factor(0.0), places=7)
self.assertAlmostEqual(2.0, dB_to_factor(3.0), places=2)
self.assertAlmostEqual(10.0, dB_to_factor(10.0), places=7)
class TestLossCompensation(base.TestCase):
def test_integer_losses_no_remainder(self):
losses = [1.0, 2.0, 3.0, 4.0]
attenuation_integer_dB, remainder_factor = loss_compensation(
numpy.array(losses)
)
# verify that there is no remainder
self.assertTrue(
numpy.all(remainder_factor == 1.0),
msg=f"attenuation_integer_dB = {attenuation_integer_dB}, remainder_factor = {remainder_factor}",
)
def test_loss_compensation_lines_up(self):
"""Test whether signals line up after the computed delay compensation."""
losses = [1.0, 2.0, 3.0, 4.0]
attenuation_integer_dB, _ = loss_compensation(numpy.array(losses))
attenuation_integer_dB = loss_compensation(numpy.array(losses))
# sample_shift and delay_samples together should line everything up
effective_attenuation = losses + attenuation_integer_dB
......@@ -232,20 +210,24 @@ class TestLossCompensation(base.TestCase):
# losses in dB we want to compensate for. they all round to the same integer value
losses = [0.75, 1.0, 1.25]
attenuation_integer_dB, remainder_factor = loss_compensation(
numpy.array(losses)
)
attenuation_integer_dB = loss_compensation(numpy.array(losses))
# should not result in any sample shifts
self.assertEqual(0, attenuation_integer_dB[0])
self.assertEqual(0, attenuation_integer_dB[1])
self.assertEqual(0, attenuation_integer_dB[2])
# remainder should correspond with differences.
# NB: these are the factors to apply to line up the signals.
self.assertAlmostEqual(dB_to_factor(+0.25), remainder_factor[0])
self.assertAlmostEqual(dB_to_factor(0.0), remainder_factor[1])
self.assertAlmostEqual(dB_to_factor(-0.25), remainder_factor[2])
def test_round_nearest(self):
"""Test whether we round to the nearest sample."""
# losses in dB we want to compensate for. they all round to the same integer value
losses = [1.6, 0.0]
attenuation_integer_dB = loss_compensation(numpy.array(losses))
# should not result in any sample shifts
self.assertEqual(0, attenuation_integer_dB[0])
self.assertEqual(2, attenuation_integer_dB[1])
class TestDelayCompensation(base.TestCase):
......@@ -257,24 +239,13 @@ class TestDelayCompensation(base.TestCase):
# compute delay compensation
return delay_compensation(delays_seconds, clock)
def test_whole_sample_shifts_no_remainder(self):
"""Test whether delay compensation indeed has no remainder if we shift whole samples."""
# delay to compensate for, in samples
delay_samples = [1, 2, 3, 4]
_, remainder_seconds = self._compute_delay_compensation(delay_samples)
# verify that there is no remainder
self.assertTrue(numpy.all(remainder_seconds == 0.0), msg=f"{remainder_seconds}")
def test_sample_shifts_line_up(self):
"""Test whether signals line up after the computed delay compensation."""
# delay to compensate for, in samples
delay_samples = [1, 2, 3, 4]
sample_shift, _ = self._compute_delay_compensation(delay_samples)
sample_shift = self._compute_delay_compensation(delay_samples)
# sample_shift and delay_samples together should line everything up
effective_signal_delay = delay_samples + sample_shift
......@@ -290,19 +261,41 @@ class TestDelayCompensation(base.TestCase):
"""Test correctness of the delay compensation remainders."""
# delays in samples we want to compensate for. they all round to the same sample
delay_samples = [0.75, 1.0, 1.25]
delay_samples = [0.76, 1.0, 1.25]
sample_shift, remainder_seconds = self._compute_delay_compensation(
delay_samples
)
sample_shift = self._compute_delay_compensation(delay_samples)
# should not result in any sample shifts
self.assertEqual(0, sample_shift[0])
self.assertEqual(0, sample_shift[1])
self.assertEqual(0, sample_shift[2])
# remainder should correspond with differences.
# NB: these are the remainders to apply to line up the signals.
self.assertAlmostEqual(+0.25, remainder_seconds[0] / 5e-9)
self.assertAlmostEqual(0.00, remainder_seconds[1] / 5e-9)
self.assertAlmostEqual(-0.25, remainder_seconds[2] / 5e-9)
def test_delay_round_nearest(self):
"""Test correctness of the delay compensation rounding."""
# delays in samples we want to compensate for. they all round to the same sample
delay_samples = [0.0, 1.6]
sample_shift = self._compute_delay_compensation(delay_samples)
# should not result in any sample shifts
self.assertEqual(2, sample_shift[0])
self.assertEqual(0, sample_shift[1])
def test_delay_against_LOFAR1(self):
"""Test correctness of the delay compensation regression against LOFAR1."""
# INT HBA: 130m, 115m, 85m
delay_seconds = numpy.array([530.6981e-9, 465.5254e-9, 342.5133e-9])
sample_shift = delay_compensation(delay_seconds, 200_000_000)
self.assertListEqual([0, 13, 38], sample_shift.tolist())
# RS HBA: 115m, 85m
delay_seconds = numpy.array([465.5254e-9, 342.5133e-9])
sample_shift = delay_compensation(delay_seconds, 200_000_000)
self.assertListEqual([0, 25], sample_shift.tolist())
# LBA: 115m, 80m, 50m
delay_seconds = numpy.array([465.5254e-9, 326.9640e-9, 199.2573e-9])
sample_shift = delay_compensation(delay_seconds, 200_000_000)
self.assertListEqual([0, 28, 53], sample_shift.tolist())
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment