diff --git a/README.md b/README.md index 480c86204f16bd7e3aa17f1ea635ba24a19879bd..0524b6992bb416de20d4c4650edae0bf25ff4a6f 100644 --- a/README.md +++ b/README.md @@ -151,6 +151,7 @@ Next change the version in the following places: # Release Notes +* 0.43.1 Fix rounding for the coarse delay and loss compensations. * 0.43.0 Use PyTango 10.0.0 * 0.42.12 Calibration device now fetches the station's name from the StationManager * 0.42.11 Fix HBA inner antenna mask for remote stations diff --git a/tangostationcontrol/VERSION b/tangostationcontrol/VERSION index 8298bb08b2d508998f7c7e6acc20c2a5177598dc..f8287cf9564db399c11ab5b0578cb0bb2e36bcf0 100644 --- a/tangostationcontrol/VERSION +++ b/tangostationcontrol/VERSION @@ -1 +1 @@ -0.43.0 +0.43.1 diff --git a/tangostationcontrol/tangostationcontrol/common/calibration.py b/tangostationcontrol/tangostationcontrol/common/calibration.py index bf8a2518f49d53e2afe555348eae37523087bb3d..b952b42f6ecc3ac66ce02b4da6d7724419dc9cf3 100644 --- a/tangostationcontrol/tangostationcontrol/common/calibration.py +++ b/tangostationcontrol/tangostationcontrol/common/calibration.py @@ -218,7 +218,7 @@ class CalibrationManager: ) -def delay_compensation(delays_seconds: numpy.ndarray, clock: int): +def delay_compensation(delays_seconds: numpy.ndarray, clock: int) -> numpy.ndarray: """Return the delay compensation required to line up signals that are delayed by "delays" seconds. The returned values are the delay to apply, in samples (coarse) and remaining seconds @@ -238,8 +238,8 @@ def delay_compensation(delays_seconds: numpy.ndarray, clock: int): # chain, while input_* are the amount of (negative) delay to apply # to compensate. - # compute the coarse correction, in samples - signal_delays_samples = numpy.round(delays_seconds * clock).astype(numpy.uint32) + # compute the correction, in samples + signal_delays_samples = delays_seconds * clock # correct for the coarse delay by delaying the other signals to line up # we cannot configure a negative number of samples, so we must delay @@ -247,18 +247,11 @@ def delay_compensation(delays_seconds: numpy.ndarray, clock: int): # # This introduces a constant shift in timing for all samples, # as we shift all of them to obtain a non-negative delay. - input_delays_samples = max(signal_delays_samples) - signal_delays_samples + input_delays_samples = numpy.round( + max(signal_delays_samples) - signal_delays_samples + ).astype(numpy.int32) - # compute the remainder, in seconds - signal_delays_subsample_seconds = delays_seconds - signal_delays_samples / clock - input_delays_subsample_seconds = -signal_delays_subsample_seconds - - return input_delays_samples, input_delays_subsample_seconds - - -def dB_to_factor(dB: numpy.ndarray) -> numpy.ndarray: - """Convert values in decibel (dB) into their equivalent scaling factors.""" - return 10 ** (dB / 10) + return input_delays_samples def calibrate_input_samples_delay( @@ -278,7 +271,7 @@ def calibrate_input_samples_delay( # compute the required compensation clock = sdpfirmware.clock_RW - input_samples_delay, _ = delay_compensation(signal_delay_seconds, clock) + input_samples_delay = delay_compensation(signal_delay_seconds, clock) # read-modify-write on [fpga][(input, polarisation)] fpga_signal_input_samples_delay = sdp.FPGA_signal_input_samples_delay_RW @@ -315,7 +308,7 @@ def calibrate_RCU_attenuator_dB(antenna_field: DeviceProxy): # return coarse attenuation to apply (weakest signal # gets 0 attenuation). - rcu_attenuator_db, _ = loss_compensation(signal_delay_loss) + rcu_attenuator_db = loss_compensation(signal_delay_loss) # add field-wide attenuation rcu_attenuator_db += antenna_field.Field_Attenuation_R @@ -324,36 +317,23 @@ def calibrate_RCU_attenuator_dB(antenna_field: DeviceProxy): antenna_field.RCU_attenuator_dB_RW = rcu_attenuator_db.astype(numpy.int64) -def loss_compensation(losses_dB: numpy.ndarray): +def loss_compensation(losses_dB: numpy.ndarray) -> numpy.ndarray: """Return the attenuation required to line up signals that are dampened by "lossed_dB" decibel. - Returned are the signal attenuations in whole dBs (coarse), and - the remaining scaling (as a factor), as a tuple (whole dBs, remainder). + Returned are the signal attenuations in whole dBs (coarse). The coarse attenuation is to be applied in recv.RCU_attenuation_dB_RW, - the fine scaling is to be incorporated into sdp.FPGA_subband_weights_RW. + the remainder is to be incorporated into sdp.FPGA_subband_weights_RW. Applying this correction equalises the signal across the inputs to be dampened max(round(losses_dB)) instead of their value in losses_dB. So we do _not_ fully dampen towards the weakest signal. """ - # NB: signal_* are the amount of loss the signal obtained in our processing - # chain, while input_* are the amount of (dampening) attenuation to apply - # to compensate. - - # compute the coarse correction, in samples - signal_attenuation_integer_dB = numpy.round(losses_dB).astype(numpy.uint32) - # correct for the coarse loss by dampening the signals to line up. - input_attenuation_integer_dB = ( - numpy.max(signal_attenuation_integer_dB) - signal_attenuation_integer_dB + input_attenuation_integer_dB = numpy.round(numpy.max(losses_dB) - losses_dB).astype( + numpy.int32 ) - # compute the remainder, as a scaling factor - signal_loss_remainder_dB = losses_dB - signal_attenuation_integer_dB - input_attenuation_remainder_dB = -signal_loss_remainder_dB - input_attenuation_remainder_factor = dB_to_factor(input_attenuation_remainder_dB) - - return (input_attenuation_integer_dB, input_attenuation_remainder_factor) + return input_attenuation_integer_dB diff --git a/tangostationcontrol/test/common/test_calibration.py b/tangostationcontrol/test/common/test_calibration.py index d599ac619c4f116df30f3d32ab4af7ea876da55a..7b801d6a8eb2a1c23f05400910e7534386db6aa7 100644 --- a/tangostationcontrol/test/common/test_calibration.py +++ b/tangostationcontrol/test/common/test_calibration.py @@ -12,7 +12,6 @@ from tangostationcontrol.common import consul from tangostationcontrol.common.calibration import ( delay_compensation, loss_compensation, - dB_to_factor, CalibrationManager, CalibrationTable, ) @@ -187,34 +186,13 @@ class TestCalibrationManager(base.TestCase): ) -class TestCalibration(base.TestCase): - def test_dB_to_factor(self): - # Throw some known values at it - self.assertAlmostEqual(1.0, dB_to_factor(0.0), places=7) - self.assertAlmostEqual(2.0, dB_to_factor(3.0), places=2) - self.assertAlmostEqual(10.0, dB_to_factor(10.0), places=7) - - class TestLossCompensation(base.TestCase): - def test_integer_losses_no_remainder(self): - losses = [1.0, 2.0, 3.0, 4.0] - - attenuation_integer_dB, remainder_factor = loss_compensation( - numpy.array(losses) - ) - - # verify that there is no remainder - self.assertTrue( - numpy.all(remainder_factor == 1.0), - msg=f"attenuation_integer_dB = {attenuation_integer_dB}, remainder_factor = {remainder_factor}", - ) - def test_loss_compensation_lines_up(self): """Test whether signals line up after the computed delay compensation.""" losses = [1.0, 2.0, 3.0, 4.0] - attenuation_integer_dB, _ = loss_compensation(numpy.array(losses)) + attenuation_integer_dB = loss_compensation(numpy.array(losses)) # sample_shift and delay_samples together should line everything up effective_attenuation = losses + attenuation_integer_dB @@ -232,20 +210,24 @@ class TestLossCompensation(base.TestCase): # losses in dB we want to compensate for. they all round to the same integer value losses = [0.75, 1.0, 1.25] - attenuation_integer_dB, remainder_factor = loss_compensation( - numpy.array(losses) - ) + attenuation_integer_dB = loss_compensation(numpy.array(losses)) # should not result in any sample shifts self.assertEqual(0, attenuation_integer_dB[0]) self.assertEqual(0, attenuation_integer_dB[1]) self.assertEqual(0, attenuation_integer_dB[2]) - # remainder should correspond with differences. - # NB: these are the factors to apply to line up the signals. - self.assertAlmostEqual(dB_to_factor(+0.25), remainder_factor[0]) - self.assertAlmostEqual(dB_to_factor(0.0), remainder_factor[1]) - self.assertAlmostEqual(dB_to_factor(-0.25), remainder_factor[2]) + def test_round_nearest(self): + """Test whether we round to the nearest sample.""" + + # losses in dB we want to compensate for. they all round to the same integer value + losses = [1.6, 0.0] + + attenuation_integer_dB = loss_compensation(numpy.array(losses)) + + # should not result in any sample shifts + self.assertEqual(0, attenuation_integer_dB[0]) + self.assertEqual(2, attenuation_integer_dB[1]) class TestDelayCompensation(base.TestCase): @@ -257,24 +239,13 @@ class TestDelayCompensation(base.TestCase): # compute delay compensation return delay_compensation(delays_seconds, clock) - def test_whole_sample_shifts_no_remainder(self): - """Test whether delay compensation indeed has no remainder if we shift whole samples.""" - - # delay to compensate for, in samples - delay_samples = [1, 2, 3, 4] - - _, remainder_seconds = self._compute_delay_compensation(delay_samples) - - # verify that there is no remainder - self.assertTrue(numpy.all(remainder_seconds == 0.0), msg=f"{remainder_seconds}") - def test_sample_shifts_line_up(self): """Test whether signals line up after the computed delay compensation.""" # delay to compensate for, in samples delay_samples = [1, 2, 3, 4] - sample_shift, _ = self._compute_delay_compensation(delay_samples) + sample_shift = self._compute_delay_compensation(delay_samples) # sample_shift and delay_samples together should line everything up effective_signal_delay = delay_samples + sample_shift @@ -290,19 +261,41 @@ class TestDelayCompensation(base.TestCase): """Test correctness of the delay compensation remainders.""" # delays in samples we want to compensate for. they all round to the same sample - delay_samples = [0.75, 1.0, 1.25] + delay_samples = [0.76, 1.0, 1.25] - sample_shift, remainder_seconds = self._compute_delay_compensation( - delay_samples - ) + sample_shift = self._compute_delay_compensation(delay_samples) # should not result in any sample shifts self.assertEqual(0, sample_shift[0]) self.assertEqual(0, sample_shift[1]) self.assertEqual(0, sample_shift[2]) - # remainder should correspond with differences. - # NB: these are the remainders to apply to line up the signals. - self.assertAlmostEqual(+0.25, remainder_seconds[0] / 5e-9) - self.assertAlmostEqual(0.00, remainder_seconds[1] / 5e-9) - self.assertAlmostEqual(-0.25, remainder_seconds[2] / 5e-9) + def test_delay_round_nearest(self): + """Test correctness of the delay compensation rounding.""" + + # delays in samples we want to compensate for. they all round to the same sample + delay_samples = [0.0, 1.6] + + sample_shift = self._compute_delay_compensation(delay_samples) + + # should not result in any sample shifts + self.assertEqual(2, sample_shift[0]) + self.assertEqual(0, sample_shift[1]) + + def test_delay_against_LOFAR1(self): + """Test correctness of the delay compensation regression against LOFAR1.""" + + # INT HBA: 130m, 115m, 85m + delay_seconds = numpy.array([530.6981e-9, 465.5254e-9, 342.5133e-9]) + sample_shift = delay_compensation(delay_seconds, 200_000_000) + self.assertListEqual([0, 13, 38], sample_shift.tolist()) + + # RS HBA: 115m, 85m + delay_seconds = numpy.array([465.5254e-9, 342.5133e-9]) + sample_shift = delay_compensation(delay_seconds, 200_000_000) + self.assertListEqual([0, 25], sample_shift.tolist()) + + # LBA: 115m, 80m, 50m + delay_seconds = numpy.array([465.5254e-9, 326.9640e-9, 199.2573e-9]) + sample_shift = delay_compensation(delay_seconds, 200_000_000) + self.assertListEqual([0, 28, 53], sample_shift.tolist())