Skip to content
Snippets Groups Projects
Commit 3abf55c2 authored by Jan David Mol's avatar Jan David Mol
Browse files

Merge branch 'expose-beamlet-frequency' into 'master'

L2SS-1880: Expose the beamlet frequencies used for beamforming

See merge request !932
parents cacfada7 eee90022
No related branches found
No related tags found
1 merge request!932L2SS-1880: Expose the beamlet frequencies used for beamforming
......@@ -161,6 +161,7 @@ Next change the version in the following places:
# Release Notes
* 0.38.4 Fixed ordering in subband_frequency_R, which broke frequency calculations for HBA
* 0.38.3 Upgraded to JupyterLab v4
* 0.38.2 Fixed polling of some attributes required by Metadata device
Wait 2s after enabling FPGA_processing_enable_RW to allow it to propagate
......
0.38.3
0.38.4
......@@ -49,8 +49,6 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase):
"""Intentionally recreate the device object in each test"""
super().setUp("STAT/DigitalBeam/HBA0")
self.proxy.put_property({"Beamlet_Select": [True] * N_beamlets_ctrl})
self.recv_proxy = self.setup_proxy(self.recv_iden, defaults=True)
self.sdpfirmware_proxy = self.setup_proxy(self.sdpfirmware_iden, defaults=True)
self.sdp_proxy = self.setup_proxy(self.sdp_iden)
......
......@@ -45,6 +45,7 @@ logger = logging.getLogger()
@device_metrics(
exclude=[
"beamlet_frequency_R",
"FPGA_beamlet_subband_select_*",
"FPGA_bf_weights_*",
"FPGA_beamlet_output_hdr_*",
......@@ -607,6 +608,15 @@ class Beamlet(OPCUADevice):
fisallowed="is_attribute_access_allowed",
)
beamlet_frequency_R = attribute(
doc="Frequency of each beamlet for each input",
unit="Hz",
dtype=((numpy.float64,),),
max_dim_y=N_pn * A_pn,
max_dim_x=N_beamlets_ctrl,
fisallowed="is_attribute_access_allowed",
)
def get_defaults(self, properties: Dict[str, object]) -> List[Tuple[str, object]]:
return super().get_defaults(properties) + self._beamlet_output_defaults(
properties
......@@ -712,7 +722,7 @@ class Beamlet(OPCUADevice):
def cache_clear(self):
"""Explicitly clear any caches."""
self._beamlet_frequencies.cache_clear()
self.beamlet_frequencies.cache_clear()
"""
The SDP FPGAs correct for signal-delay differences by rotating the phases of the antenna signals. A delay
......@@ -734,25 +744,10 @@ class Beamlet(OPCUADevice):
The phases, delays, and final beam weights, all have shape (fpga_nr, [input_nr][pol][beamlet_nr]).
"""
@lru_cache() # this function requires large hardware reads for values that don't change often
def _beamlet_frequencies(self):
"""Obtain the frequencies (in Hz) of each subband
that is selected for each antenna and beamlet.
Returns shape (fpga_nr, input_nr, beamlet_nr), so one value per antenna, not per input.
This makes the result directly usable for FPGA_bf_weights_pp_RW.
"""
# obtain which subband is selected for each input and beamlet
beamlet_subbands = self.read_attribute(
"FPGA_beamlet_subband_select_RW"
).reshape(
N_pn, A_pn, N_pol, N_beamlets_ctrl
) # orig: (fpga_nr, [input_nr][pol][beamlet_nr])
subband_frequencies = self.sdp_proxy.subband_frequency_R.reshape(
N_pn, A_pn, N_pol, N_subbands
) # orig: ([fpga_nr][input_nr], subband_nr)
@staticmethod
def _beamlet_frequencies(
beamlet_subbands: numpy.ndarray, subband_frequencies: numpy.ndarray
) -> numpy.ndarray:
def frequencies_per_input(fpga_nr, antenna_nr):
"""Return the frequencies for the selected subbands of the given input."""
return numpy.take(
......@@ -768,14 +763,35 @@ class Beamlet(OPCUADevice):
frequencies = numpy.array(
[
frequencies_per_input(fpga_nr, antenna_nr)
for antenna_nr in range(A_pn)
for fpga_nr in range(N_pn)
for antenna_nr in range(A_pn)
],
dtype=numpy.float64,
)
return frequencies.reshape(N_pn, A_pn, N_beamlets_ctrl)
@lru_cache() # this function requires large hardware reads for values that don't change often
def beamlet_frequencies(self):
"""Obtain the frequencies (in Hz) of each subband
that is selected for each antenna and beamlet.
Returns shape (fpga_nr, input_nr, beamlet_nr), so one value per antenna, not per input.
This makes the result directly usable for FPGA_bf_weights_pp_RW.
"""
# obtain which subband is selected for each input and beamlet
beamlet_subbands = self.read_attribute(
"FPGA_beamlet_subband_select_RW"
).reshape(
N_pn, A_pn, N_pol, N_beamlets_ctrl
) # orig: (fpga_nr, [input_nr][pol][beamlet_nr])
subband_frequencies = self.sdp_proxy.subband_frequency_R.reshape(
N_pn, A_pn, N_pol, N_subbands
) # orig: ([fpga_nr][input_nr], subband_nr)
return self._beamlet_frequencies(beamlet_subbands, subband_frequencies)
@staticmethod
def _calculate_bf_weights(
delays: numpy.ndarray, beamlet_frequencies: numpy.ndarray
......@@ -795,6 +811,9 @@ class Beamlet(OPCUADevice):
return bf_weights
def read_beamlet_frequency_R(self):
return self.beamlet_frequencies().reshape(N_pn * A_pn, N_beamlets_ctrl)
# --------
# Commands
# --------
......@@ -805,7 +824,7 @@ class Beamlet(OPCUADevice):
# Calculate the FPGA weight array
delays = delays.reshape(N_pn, A_pn, N_beamlets_ctrl)
beamlet_frequencies = self._beamlet_frequencies()
beamlet_frequencies = self.beamlet_frequencies()
bf_weights = self._calculate_bf_weights(delays, beamlet_frequencies)
return bf_weights.flatten()
......@@ -494,15 +494,10 @@ class SDP(OPCUADevice):
# to make sure the subbands are ascending in frequency
self.proxy.FPGA_spectral_inversion_RW = self._nyquist_zone % 2
@DurationMetric()
def read_subband_frequency_R(self):
clock = self.control.read_parent_attribute("clock_RW") # scalar
nyquist_zone = numpy.array(
self.read_attribute("nyquist_zone_RW")
) # N_pn x S_pn
spectral_inversion = self.read_attribute(
"FPGA_spectral_inversion_RW"
) # N_pn x S_pn
@staticmethod
def _subband_frequencies(
clock: int, nyquist_zone: numpy.ndarray, spectral_inversion: numpy.ndarray
) -> numpy.ndarray:
subband_list = numpy.array(range(N_subbands))
def frequencies_per_input(fpga_nr, input_nr):
......@@ -518,14 +513,26 @@ class SDP(OPCUADevice):
frequencies = numpy.array(
[
frequencies_per_input(fpga_nr, input_nr)
for input_nr in range(S_pn)
for fpga_nr in range(N_pn)
for input_nr in range(S_pn)
],
dtype=numpy.float64,
)
return frequencies.reshape(N_pn * S_pn, N_subbands)
@DurationMetric()
def read_subband_frequency_R(self):
clock = self.control.read_parent_attribute("clock_RW") # scalar
nyquist_zone = numpy.array(
self.read_attribute("nyquist_zone_RW")
) # N_pn x S_pn
spectral_inversion = self.read_attribute(
"FPGA_spectral_inversion_RW"
) # N_pn x S_pn
return self._subband_frequencies(clock, nyquist_zone, spectral_inversion)
# ----------
# Summarising Attributes
# ----------
......
......@@ -6,8 +6,12 @@ import numpy.testing
from tangostationcontrol.common.constants import (
CLK_200_MHZ,
A_pn,
N_pn,
N_pol,
N_beamlets_ctrl,
N_bdo_destinations_mm,
N_subbands,
)
from tangostationcontrol.common.sdp import weight_to_complex
from tangostationcontrol.devices.sdp.beamlet import Beamlet
......@@ -145,3 +149,87 @@ class TestBeamletDevice(base.TestCase):
defaults["FPGA_beamlet_output_multiple_hdr_udp_destination_port_RW"],
[PORTs + [0] * (N_bdo_destinations_mm - 4)] * N_pn,
)
def _verify_beamlet_frequencies(self, beamlet_subbands, subband_frequencies):
beamlet_frequencies = Beamlet._beamlet_frequencies(
beamlet_subbands, subband_frequencies
)
# explicitly construct expected output. We can count on the subband_frequency
# to work as that is tested elsewhere.
expected_frequencies = numpy.zeros(
(N_pn, A_pn, N_beamlets_ctrl), dtype=numpy.float64
)
for beamlet_nr in range(N_beamlets_ctrl):
for fpga_nr in range(N_pn):
for antenna_nr in range(A_pn):
pol_nr = 0
subband_nr = beamlet_subbands[
fpga_nr, antenna_nr, pol_nr, beamlet_nr
]
expected_frequencies[fpga_nr, antenna_nr, beamlet_nr] = (
subband_frequencies[fpga_nr, antenna_nr, pol_nr, subband_nr]
)
numpy.testing.assert_array_almost_equal(
beamlet_frequencies, expected_frequencies.reshape(beamlet_frequencies.shape)
)
def test_beamlet_frequencies_basic(self):
beamlet_subbands = numpy.array(
[[[list(range(N_beamlets_ctrl))] * N_pol] * A_pn] * N_pn, dtype=numpy.uint32
)
subband_indices = numpy.array(
[[[list(range(N_subbands))] * N_pol] * A_pn] * N_pn, dtype=numpy.uint32
)
subband_frequencies = subband_indices * (CLK_200_MHZ / N_subbands)
self._verify_beamlet_frequencies(beamlet_subbands, subband_frequencies)
def test_beamlet_frequencies_mixed_fpga_settings(self):
"""Test different settings for the FPGAs to verify the ordering."""
beamlet_subbands = numpy.array(
[[[list(range(N_beamlets_ctrl))] * N_pol] * A_pn] * N_pn, dtype=numpy.uint32
)
subband_indices = numpy.array(
[[[list(range(N_subbands))] * N_pol] * A_pn] * N_pn, dtype=numpy.uint32
)
subband_frequencies = subband_indices * (CLK_200_MHZ / N_subbands)
# change settings for FPGAs beyond 4
beamlet_subbands[4:, :, :, :] = 0
self._verify_beamlet_frequencies(beamlet_subbands, subband_frequencies)
def test_beamlet_frequencies_mixed_input_settings(self):
"""Test different settings for the inputs of the FPGAs to verify the ordering."""
beamlet_subbands = numpy.array(
[[[list(range(N_beamlets_ctrl))] * N_pol] * A_pn] * N_pn, dtype=numpy.uint32
)
subband_indices = numpy.array(
[[[list(range(N_subbands))] * N_pol] * A_pn] * N_pn, dtype=numpy.uint32
)
subband_frequencies = subband_indices * (CLK_200_MHZ / N_subbands)
# change settings for inputs beyond 4
beamlet_subbands[:, 4:, :, :] = 0
self._verify_beamlet_frequencies(beamlet_subbands, subband_frequencies)
def test_beamlet_frequencies_unordered_subbands(self):
"""Test a shuffled order of subbands."""
beamlet_subbands = numpy.array(
[[[list(range(N_beamlets_ctrl))] * N_pol] * A_pn] * N_pn, dtype=numpy.uint32
)
subband_indices = numpy.array(
[[[list(range(N_subbands))] * N_pol] * A_pn] * N_pn, dtype=numpy.uint32
)
# shuffle subbands (assuming 7 is coprime to N_subbands)
subband_indices = (subband_indices * 7) % N_subbands
subband_frequencies = subband_indices * (CLK_200_MHZ / N_subbands)
self._verify_beamlet_frequencies(beamlet_subbands, subband_frequencies)
# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
import numpy
import numpy.testing
# Internal test imports
from test.devices import device_base
from tangostationcontrol.common.constants import (
CLK_160_MHZ,
CLK_200_MHZ,
N_pn,
S_pn,
N_subbands,
)
from tangostationcontrol.common.sdp import subband_frequency
from tangostationcontrol.devices.sdp.sdp import SDP
class TestSDPDevice(device_base.DeviceTestCase):
def _verify_subband_frequencies(self, clock, nyquist_zone, spectral_inversion):
subband_frequencies = SDP._subband_frequencies(
clock, nyquist_zone, spectral_inversion
)
# explicitly construct expected output. We can count on the subband_frequency
# to work as that is tested elsewhere.
expected_frequencies = numpy.zeros(
(N_pn, S_pn, N_subbands), dtype=numpy.float64
)
for sb in range(N_subbands):
for fpga_nr in range(N_pn):
for antenna_nr in range(S_pn):
expected_frequencies[fpga_nr, antenna_nr, sb] = subband_frequency(
sb,
clock,
nyquist_zone[fpga_nr, antenna_nr],
spectral_inversion[fpga_nr, antenna_nr],
)
numpy.testing.assert_array_almost_equal(
subband_frequencies, expected_frequencies.reshape(subband_frequencies.shape)
)
def test_subband_frequencies_200mhz(self):
clock = CLK_200_MHZ
nyquist_zone = numpy.array([[1] * S_pn] * N_pn)
spectral_inversion = numpy.array([[True] * S_pn] * N_pn)
self._verify_subband_frequencies(clock, nyquist_zone, spectral_inversion)
def test_subband_frequencies_160mhz(self):
clock = CLK_160_MHZ
nyquist_zone = numpy.array([[1] * S_pn] * N_pn)
spectral_inversion = numpy.array([[True] * S_pn] * N_pn)
self._verify_subband_frequencies(clock, nyquist_zone, spectral_inversion)
def test_subband_frequencies_mixed_fpga_settings(self):
"""Test different settings for the FPGAs to verify the ordering."""
clock = CLK_200_MHZ
nyquist_zone = numpy.array([[1] * S_pn] * N_pn)
spectral_inversion = numpy.array([[True] * S_pn] * N_pn)
# change settings for FPGAs beyond 4
nyquist_zone[4:, :] = 0
spectral_inversion[4:, :] = False
self._verify_subband_frequencies(clock, nyquist_zone, spectral_inversion)
def test_subband_frequencies_mixed_input_settings(self):
"""Test different settings for the inputs of the FPGAs to verify the ordering."""
clock = CLK_200_MHZ
nyquist_zone = numpy.array([[1] * S_pn] * N_pn)
spectral_inversion = numpy.array([[True] * S_pn] * N_pn)
# change settings for inputs beyond 4
nyquist_zone[:, 4:] = 0
spectral_inversion[:, 4:] = False
self._verify_subband_frequencies(clock, nyquist_zone, spectral_inversion)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment