Skip to content
Snippets Groups Projects
Commit 2462a3e0 authored by Jan David Mol's avatar Jan David Mol
Browse files

L2SS-1006: Move Antenna_Type as a property of AntennaField instead of SDP

parent cb1dbe66
No related branches found
No related tags found
1 merge request!451Draft: Move configuration about to implement calibration
class CableType(object):
""" A cable used in LOFAR, with its properties. """
def __init__(name: str, length: int, delay: float, loss: dict):
self.name = name # for reverse lookups
self.length = length # metres
self.delay = delay # seconds
self.loss = loss # frequency_mhz -> dB
def speed(self):
""" Return the speed of the signal in this cable, in m/s. """
return self.length / self.delay
def get_loss(self, antenna_type: str, rcu_band_select: int) -> float:
""" Get the appropiate loss value (in dB), for the given
antenna type and RCU band selection. """
if antenna_type == "LBA":
return self.loss[50]
elif antenna_type == "HBA":
if rcu_band_select == 1:
return self.loss[150]
elif rcu_band_select == 2:
return self.loss[200]
elif rcu_band_select == 4:
return self.loss[250]
else:
raise ValueError(f"Unsupported RCU band selection for HBA: {rcu_band_select}")
raise ValueError(f"Unsupported antenna type: {antenna_type}")
# Global list of all known cable types.
#
# NB: The LOFAR1 equivalents of these tables are:
# - MAC/Deployment/data/StaticMetaData/CableDelays/
# - MAC/Deployment/data/StaticMetaData/CableAttenuation.conf
cable_types = {}
cable_types[ "0m"] = CableType(name= "0m", length= 0, delay=000.0000e-9, loss={50: 0.00, 150: 0.00, 200: 0.00, 250: 0.00})
cable_types[ "50m"] = CableType(name= "50m", length= 50, delay=199.2573e-9, loss={50: 2.05, 150: 3.64, 200: 4.24, 250: 4.46})
cable_types[ "80m"] = CableType(name= "80m", length= 80, delay=326.9640e-9, loss={50: 3.32, 150: 5.87, 200: 6.82, 250: 7.19})
cable_types[ "85m"] = CableType(name= "85m", length= 85, delay=342.5133e-9, loss={50: 3.53, 150: 6.22, 200: 7.21, 250: 7.58})
cable_types["115m"] = CableType(name="115m", length=115, delay=465.5254e-9, loss={50: 4.74, 150: 8.35, 200: 9.70, 250: 10.18})
cable_types["120m"] = CableType(name="120m", length=120, delay=493.8617e-9, loss={50: 4.85, 150: 8.55, 200: 9.92, 250: 10.42}) # used on CS030
cable_types["130m"] = CableType(name="130m", length=130, delay=530.6981e-9, loss={50: 5.40, 150: 9.52, 200: 11.06, 250: 11.61})
......@@ -20,6 +20,7 @@ from tangostationcontrol.common.type_checking import type_not_sequence
from tangostationcontrol.common.entrypoint import entry
from tangostationcontrol.devices.lofar_device import lofar_device
from tangostationcontrol.common.lofar_logging import device_logging_to_python, log_exceptions
from tangostationcontrol.common.cables import cable_types
from tangostationcontrol.devices.device_decorators import fault_on_error
from tangostationcontrol.beam.geo import ETRS_to_ITRF, ITRF_to_GEO, GEO_to_GEOHASH
from tangostationcontrol.beam.hba_tile import HBATAntennaOffsets, NUMBER_OF_ELEMENTS_PER_TILE
......@@ -131,6 +132,13 @@ class AntennaField(lofar_device):
default_value = numpy.array([False] * MAX_NUMBER_OF_HBAT)
)
Antenna_Cables = device_property(
doc=f"Which cables connect each antenna to the RCU. Both polarisations are assumed to be connected using the same type of cable. Needs to be any of ({', '.join(cable_types.keys())}).",
dtype='DevStringArray',
mandatory=False,
default_value = numpy.array(["0m"] * MAX_NUMBER_OF_HBAT)
)
# ----- Position information
Antenna_Field_Reference_ITRF = device_property(
......@@ -170,6 +178,7 @@ class AntennaField(lofar_device):
mandatory=False,
default_value = 2015.5
)
HBAT_PQR_rotation_angles_deg = device_property(
doc='Rotation of each tile in the PQ plane ("horizontal") in degrees.',
dtype='DevVarFloatArray',
......@@ -225,6 +234,10 @@ class AntennaField(lofar_device):
default_value = [-1] * MAX_NUMBER_OF_HBAT * 2
)
# the X and Y signals traverse over the power and control lines, respectively
X_to_RECV_mapping = Power_to_RECV_mapping
Y_to_RECV_mapping = Control_to_RECV_mapping
RECV_devices = device_property(
dtype=(str,),
doc='Which Recv devices are in use by the AntennaField. The order is important and it should match up with the order of the mapping.',
......@@ -249,6 +262,13 @@ class AntennaField(lofar_device):
Antenna_Usage_Mask_R = attribute(doc='Whether each antenna will be used.',
dtype=(bool,), max_dim_x=MAX_NUMBER_OF_HBAT)
Antenna_Cables_R = attribute(doc=f"Which cables connect each antenna to the RCU. Both polarisations are assumed to be connected using the same type of cable. Needs to be any of ({', '.join(cable_types.keys())}).",
dtype=(str,), max_dim_x=MAX_NUMBER_OF_HBAT)
Antenna_Cables_Length_R = attribute(doc=f"Length of the cable between antenna and RCU, in metres.",
dtype=(numpy.uint32,), max_dim_x=MAX_NUMBER_OF_HBAT, unit="m")
Antenna_Cables_Delay_R = attribute(doc=f"Delay caused by the cable between antenna and RCU, in seconds.",
dtype=(numpy.float64,), max_dim_x=MAX_NUMBER_OF_HBAT, unit="s")
Antenna_to_SDP_Mapping_R = attribute(doc='To which (fpga, input) pair each antenna is connected. -1=unconnected.',
dtype=((numpy.int32,),), max_dim_x=2, max_dim_y=MAX_NUMBER_OF_HBAT)
......@@ -263,6 +283,7 @@ class AntennaField(lofar_device):
HBAT_PWR_LNA_on_RW = mapped_attribute("HBAT_PWR_LNA_on_RW", dtype=((bool,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2, max_dim_y=MAX_NUMBER_OF_HBAT, access=AttrWriteType.READ_WRITE)
HBAT_PWR_on_R = mapped_attribute("HBAT_PWR_on_R", dtype=((bool,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2, max_dim_y=MAX_NUMBER_OF_HBAT)
HBAT_PWR_on_RW = mapped_attribute("HBAT_PWR_on_RW", dtype=((bool,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2, max_dim_y=MAX_NUMBER_OF_HBAT, access=AttrWriteType.READ_WRITE)
RCU_attenuator_dB_RW = mapped_attribute("RCU_attenuator_db_RW", dtype=(numpy.int64,), max_dim_x=MAX_NUMBER_OF_HBAT, access=AttrWriteType.READ_WRITE)
RCU_band_select_RW = mapped_attribute("RCU_band_select_RW", dtype=(numpy.int64,), max_dim_x=MAX_NUMBER_OF_HBAT, access=AttrWriteType.READ_WRITE)
# ----- Position information
......@@ -329,6 +350,15 @@ class AntennaField(lofar_device):
def read_Antenna_to_SDP_Mapping_R(self):
return numpy.array(self.Antenna_to_SDP_Mapping).reshape(-1,2)
def read_Antenna_Cables_R(self):
return self.Antenna_Cables
def read_Antenna_Cables_Length_R(self):
return [cables.cable_types[antenna].length for antenna in self.Antenna_Cables]
def read_Antenna_Cables_Delay_R(self):
return [cables.cable_types[antenna].delay for antenna in self.Antenna_Cables]
def read_nr_antennas_R(self):
# The number of antennas should be equal to:
# * the number of elements in the Control_to_RECV_mapping (after reshaping),
......@@ -336,7 +366,8 @@ class AntennaField(lofar_device):
# * the number of antennas exposed through Antenna_Reference_ITRF_R.
# * the number of elements in Antenna_Use
# * the number of elements in Antenna_Quality
# * the number of elements in Antenna_to_SDP_Mapping
# * the number of elements as pairs in Antenna_to_SDP_Mapping
# * the number of elements as pairs in Antenna_Cables
#
# Parsing a property here is quickest, so we chose that.
return len(self.Control_to_RECV_mapping) // 2
......@@ -475,31 +506,95 @@ class AntennaField(lofar_device):
# Turn on power to antennas that need it (and due to the ANT_mask, that we're using)
self.proxy.write_attribute('RCU_PWR_ANT_on_RW', self.Antenna_Needs_Power)
self.calibrate_recv()
# -----------------------------------------------------------
# Configure SDP
# -----------------------------------------------------------
self.configure_sdp()
self.calibrate_sdp()
# --------
# Commands
# --------
@command()
def calibrate_recv(self):
# -----------------------------------------------------------
# Tune the RCU attenuation to compensate for differences
# in cable loss.
# -----------------------------------------------------------
cable_names = self.read_attribute("Antenna_Cables_R")
rcu_bands = self.read_attribute("RCU_band_select_RW")
cable_losses = numpy.array([cable_types[name].get_loss(self.Antenna_Type, rcu_bands[antenna_nr]) for antenna_nr, name in enumerate(cable_names)])
max_loss = max(cable_losses)
self.write_attribute("RCU_attenuator_db_RW", max_loss - cable_losses)
@command()
def configure_sdp(self):
""" Configure SDP to process our antennas. """
# upload which antenna type we're using
# Mapping [antenna] -> [fpga][input]
antenna_to_sdp_mapping = self.read_attribute("Antenna_to_SDP_Mapping_R")
# -----------------------------------------------------------
# Upload which antenna type we're using
# -----------------------------------------------------------
# read-modify-write on [fpga][(input, polarisation)]
sdp_antenna_type = self.sdp_proxy.antenna_type_RW
for fpga_nr, input_nr in self.read_attribute("Antenna_to_SDP_Mapping_R"):
for fpga_nr, input_nr in antenna_to_sdp_mapping:
# set for x polarisation
sdp_antenna_type[fpga_nr, input_nr * 2 + 0] = self.Antenna_Type
# set for y polarisation
sdp_antenna_type[fpga_nr, input_nr * 2 + 1] = self.Antenna_Type
self.sdp_proxy.antenna_type_RW = sdp_antenna_type
@command()
def calibrate_sdp(self):
""" Calibrate SDP to process our antennas. """
clock = self.sdp_proxy.read_attribute("clock_RW")
# Mapping [antenna] -> [fpga][input]
antenna_to_sdp_mapping = self.read_attribute("Antenna_to_SDP_Mapping_R")
# -----------------------------------------------------------
# Set signal-input delays to compensate for differences
# in cable length.
# -----------------------------------------------------------
# The delay to correct for, [antenna] (we assume the same
# delay for both X and Y).
cable_names = self.read_attribute("Antenna_Cables_R")
# delay for each cable, in seconds
signal_delay_seconds = numpy.array([cable_types[name].delay for name in cable_names])
# convert to a delay in samples (200 MHz clock == 5ns samples)
signal_delay_samples = numpy.round(signal_delay_seconds * clock).astype(numpy.uint32)
# correct for the coarse delay by delaying the other signals to line up
# we cannot configure a negative number of samples, so we must delay
# all of them sufficiently as well.
#
# This introduces a constant shift in timing for all samples,
# as we shift all of them to obtain a non-negative delay.
input_samples_delay = max(signal_delay_samples) - signal_delay_samples
# read-modify-write on [fpga][(input, polarisation)]
fpga_signal_input_samples_delay = self.sdp_proxy.FPGA_signal_input_samples_delay_RW
for antenna_nr, (fpga_nr, input_nr) in enumerate(antenna_to_sdp_mapping):
# set for X polarisation
fpga_signal_input_samples_delay[fpga_nr, input_nr * 2 + 0] = input_samples_delay[antenna_nr]
# set for Y polarisation
fpga_signal_input_samples_delay[fpga_nr, input_nr * 2 + 1] = input_samples_delay[antenna_nr]
self.sdp_proxy.FPGA_signal_input_samples_delay_RW = fpga_signal_input_samples_delay
@command(dtype_in=DevVarFloatArray, dtype_out=DevVarLongArray)
def calculate_HBAT_bf_delay_steps(self, delays: numpy.ndarray):
num_tiles = self.read_nr_antennas_R()
......@@ -557,7 +652,8 @@ class AntennaToRecvMapper(object):
"HBAT_PWR_LNA_on_RW": value_map_ant_32_bool,
"HBAT_PWR_on_R": value_map_ant_32_bool,
"HBAT_PWR_on_RW": value_map_ant_32_bool,
"RCU_band_select_RW": numpy.zeros(number_of_antennas, dtype=numpy.int64)
"RCU_band_select_RW": numpy.zeros(number_of_antennas, dtype=numpy.int64),
"RCU_attenuator_dB_RW": numpy.zeros(number_of_antennas, dtype=numpy.int64)
}
self._masked_value_mapping_write = {
"ANT_mask_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96,
......@@ -567,6 +663,7 @@ class AntennaToRecvMapper(object):
"HBAT_PWR_LNA_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96_32,
"HBAT_PWR_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96_32,
"RCU_band_select_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96,
"RCU_attenuator_dB_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96,
}
self._reshape_attributes_in = {
"HBAT_BF_delay_steps_RW": (96, 32),
......
......@@ -247,7 +247,7 @@ class Boot(lofar_device):
"STAT/SST/1",
"STAT/XST/1",
"STAT/Beamlet/1",
"STAT/AntennaField/1", # Accesses RECV
"STAT/AntennaField/1", # Accesses RECV and SDP
"STAT/TileBeam/1", # Accesses AntennaField
"STAT/DigitalBeam/1", # Accessed SDP and Beamlet
"STAT/TemperatureManager/1",
......
......@@ -17,6 +17,7 @@ from tangostationcontrol.common.lofar_logging import log_exceptions
from tangostationcontrol.clients.attribute_wrapper import attribute_wrapper
from tangostationcontrol.devices.opcua_device import opcua_device
from tangostationcontrol.devices.sdp.sdp import SDP
from tangostationcontrol.devices.sdp.common import nyquist_zone, phases_to_weights, subband_frequencies
import numpy
from functools import lru_cache
......@@ -209,8 +210,8 @@ class Beamlet(opcua_device):
# subscribe to events to notice setting changes in SDP that determine the input frequency
self.event_subscriptions = {}
self.event_subscriptions["clock_rw"] = self.sdp_proxy.subscribe_event("clock_RW", EventType.CHANGE_EVENT, self._frequency_change_event, stateless=True)
self.event_subscriptions["nyquist_zone_r"] = self.sdp_proxy.subscribe_event("nyquist_zone_R", EventType.CHANGE_EVENT, self._frequency_change_event, stateless=True)
self.event_subscriptions["clock_RW"] = self.sdp_proxy.subscribe_event("clock_RW", EventType.CHANGE_EVENT, self._frequency_change_event, stateless=True)
self.event_subscriptions["antenna_types_RW"] = self.sdp_proxy.subscribe_event("antenna_types_RW", EventType.CHANGE_EVENT, self._frequency_change_event, stateless=True)
def configure_for_off(self):
super().configure_for_off()
......@@ -252,7 +253,8 @@ class Beamlet(opcua_device):
where 'frequency' is the subband frequency:
LBA: frequency = (subband_nr + 0) * clock / 1024
HBA: frequency = (subband_nr + 512) * clock / 1024
HBA 160 MHz: frequency = (subband_nr + 512) * clock / 1024
HBA 200 MHz: frequency = (subband_nr + 1024) * clock / 1024
The beamformer combines a set of antennas for each beamlet, and each beamlet can have a different pointing
and subband selected.
......@@ -263,45 +265,6 @@ class Beamlet(opcua_device):
The phases, delays, and final beam weights, all have shape (fpga_nr, [input_nr][pol][beamlet_nr]).
"""
BF_UNIT_WEIGHT = 2**14
@staticmethod
def _phases_to_bf_weights(phases: numpy.ndarray):
""" Convert differences in phase (in radians) into FPGA weights (complex numbers packed into uint32). """
# flatten array and restore its shape later, which makes running over all elements a lot easier
orig_shape = phases.shape
phases = phases.flatten()
# Convert array values in complex numbers
real = Beamlet.BF_UNIT_WEIGHT * numpy.cos(phases)
imag = Beamlet.BF_UNIT_WEIGHT * numpy.sin(phases)
# Interleave into (real, imag) pairs, and store as int16
# see also https://stackoverflow.com/questions/5347065/interweaving-two-numpy-arrays/5347492
# Round to nearest integer instead of rounding down
real_imag = numpy.empty(phases.size * 2, dtype=numpy.int16)
real_imag[0::2] = numpy.round(real)
real_imag[1::2] = numpy.round(imag)
# Cast each (real, imag) pair into an uint32, which brings the array size
# back to the original.
bf_weights = real_imag.view(numpy.uint32)
return bf_weights.reshape(orig_shape)
@staticmethod
def _subband_frequencies(subbands: numpy.ndarray, clock: int, nyquist_zone: int) -> numpy.ndarray:
""" Obtain the frequencies of each subband, given a clock and an antenna type. """
subband_width = clock / 1024
base_subband = nyquist_zone * 512
# broadcast clock across frequencies
frequencies = (subbands + base_subband) * subband_width
return frequencies
@lru_cache() # this function requires large hardware reads for values that don't change often
def _beamlet_frequencies(self):
""" Obtain the frequencies (in Hz) of each subband that is selected for each input and beamlet.
......@@ -311,7 +274,13 @@ class Beamlet(opcua_device):
# obtain which subband is selected for each input and beamlet
beamlet_subbands = self.read_attribute("FPGA_beamlet_subband_select_RW") # (fpga_nr, [input_nr][pol][beamlet_nr])
return self._subband_frequencies(beamlet_subbands, self.sdp_proxy.clock_RW, self.sdp_proxy.nyquist_zone_R)
# obtain which nyquist zones are used on the FPGAs
nyquist_zones = nyquist_zone(self.sdp_proxy.antenna_types_RW, self.sdp_proxy.clock_RW)
return subband_frequencies(beamlet_subbands, self.sdp_proxy.clock_RW, nyquist_zones)
BF_UNIT_WEIGHT = 2**14
@staticmethod
def _calculate_bf_weights(delays: numpy.ndarray, beamlet_frequencies: numpy.ndarray):
......@@ -326,7 +295,7 @@ class Beamlet(opcua_device):
beamlet_phases = (-2.0 * numpy.pi) * beamlet_frequencies * delays
# convert to weights
bf_weights = Beamlet._phases_to_bf_weights(beamlet_phases)
bf_weights = phases_to_weights(beamlet_phases, Beamlet.BF_UNIT_WEIGHT)
return bf_weights
......
import numpy
def nyquist_zone(self, clock: float, antenna_type: str) -> int:
""" Return the Nyquist zone for the given clock (in Hz),
and antenna type ("LBA" or "HBA").
The Nyquist zone determines the frequency offset of
the antennas.
NOTE: Only 160 and 200 MHz clocks are supported. """
# (AntennaType, clockMHz) -> Nyquist zone
nyquist_zones = {
("LBA", 160): 0,
("LBA", 200): 0,
("HBA", 160): 1,
("HBA", 200): 2,
}
try:
return nyquist_zones[(self.antenna_type, clock // 1000000)]
except KeyError:
raise ValueError(f"Could not determine Nyquist zone for antenna type {self.AntennaType} with clock {clock} Hz")
def subband_frequency(subband: int, clock: float, nyquist_zone: int) -> int:
""" Obtain the frequencies of a subband, given a clock and an antenna type. """
subband_width = clock / 1024
return (512 * nyquist_zone + subband) * subband_width
def subband_frequencies(subbands: numpy.ndarray, clock: float, nyquist_zone: int) -> numpy.ndarray:
""" Obtain the frequencies of each subband, given a clock and an antenna type. """
return subband_frequency(subbands, clock, nyquist_zone)
def phases_to_weights(phases: numpy.ndarray, unit_weight: float) -> numpy.ndarray:
""" Convert differences in phase (in radians) into FPGA weights (complex numbers packed into uint32). """
# The FPGA accepts weights as a 16-bit (imag,real) complex pair packed into an uint32.
# flatten array and restore its shape later, which makes running over all elements a lot easier
orig_shape = phases.shape
phases = phases.flatten()
# Convert array values in complex numbers
real = unit_weight * numpy.cos(phases)
imag = unit_weight * numpy.sin(phases)
# Interleave into (real, imag) pairs, and store as int16
# see also https://stackoverflow.com/questions/5347065/interweaving-two-numpy-arrays/5347492
# Round to nearest integer instead of rounding down
real_imag = numpy.empty(phases.size * 2, dtype=numpy.int16)
real_imag[0::2] = numpy.round(real)
real_imag[1::2] = numpy.round(imag)
# Cast each (real, imag) pair into an uint32, which brings the array size
# back to the original.
weights = real_imag.view(numpy.uint32)
return weights.reshape(orig_shape)
......@@ -12,14 +12,15 @@
"""
# PyTango imports
from tango.server import device_property, attribute
from tango import AttrWriteType
from tango.server import device_property, attribute, command
from tango import AttrWriteType, DeviceProxy
# Additional import
from tangostationcontrol.clients.attribute_wrapper import attribute_wrapper
from tangostationcontrol.devices.opcua_device import opcua_device
from tangostationcontrol.common.entrypoint import entry
from tangostationcontrol.common.lofar_logging import device_logging_to_python
from tangostationcontrol.devices.sdp.common import nyquist_zone, phases_to_weights, subband_frequency
import numpy
......@@ -182,9 +183,6 @@ class SDP(opcua_device):
antenna_type_RW = attribute(doc='Type of antenna (LBA or HBA) attached to each input of the FPGAs',
dtype=(str,), max_dim_y=N_pn, max_dim_x=S_pn,
access=AttrWriteType.READ_WRITE, fisallowed="is_attribute_access_allowed")
nyquist_zone_R = attribute(doc='Nyquist zone of the input frequencies',
dtype=numpy.uint32, fisallowed="is_attribute_access_allowed",
polling_period=1000, abs_change=1)
clock_RW = attribute(doc='Configured sampling clock (Hz)',
dtype=numpy.uint32, access=AttrWriteType.READ_WRITE, fisallowed="is_attribute_access_allowed",
polling_period=1000, abs_change=1)
......@@ -202,34 +200,8 @@ class SDP(opcua_device):
self._antenna_type = value
def _nyquist_zone(self, clock):
""" Return the Nyquist zone for the given clock (in Hz).
The Nyquist zone determines the frequency offset of
the antennas.
NOTE: Only 160 and 200 MHz clocks are supported. """
# (AntennaType, clockMHz) -> Nyquist zone
nyquist_zones = {
("LBA", 160): 0,
("LBA", 200): 0,
("HBA", 160): 1,
("HBA", 200): 2,
}
try:
# support only one AntennaType for now. TODO: expose nyquist zones as an array
return nyquist_zones[(self._antenna_type[0][0], clock // 1000000)]
except KeyError:
raise ValueError(f"Could not determine Nyquist zone for antenna type {self.AntennaType} with clock {clock} Hz")
def read_nyquist_zone_R(self):
try:
return self._nyquist_zone(self.read_attribute("clock_RW"))
except ValueError:
# Supply a sane default for computations in tests until L2SDP-725 allows us to read back the set clock
return 0
# This changes the Nyquist zone
self.update_nyquist_zones()
def read_clock_RW(self):
# We can only return a single value, so we assume the FPGA is configured coherently. Which is something
......@@ -248,8 +220,25 @@ class SDP(opcua_device):
# Tell all FPGAs to use this clock
self.proxy.FPGA_pps_expected_cnt_RW = [clock] * self.N_pn
# Also update the packet headers
self.proxy.FPGA_sdp_info_nyquist_sampling_zone_index_RW = [self._nyquist_zone(clock)] * self.N_pn
# This changes the Nyquist zone
self.update_nyquist_zones()
def update_nyquist_zones(self):
""" Reconfigure everything that depends on the Nyquist zone of an input. """
# derive the nyquist zones per input
clock = self.read_attribute("clock_RW")
antenna_to_nyquist_zone = numpy.vectorize(lambda antenna_type: nyquist_zone(antenna_type, clock))
nyquist_zones_per_polarised_input = antenna_to_nyquist_zone(self._antenna_type)
# derive the nyquist zone per FPGA. We use the first input per FPGA to be representative.
nyquist_zones_per_fpga = nyquist_zones_per_polarised_input[:,0]
# update the packet headers
self.proxy.FPGA_sdp_info_nyquist_sampling_zone_index_RW = nyquist_zones_per_fpga
@command(doc_in='Apply calibration and configuration to process the given AntennaField. NB: This can be performed for multiple AntennaFields, as long as their Antenna_to_SDP_Mappings do not clash.',
dtype_in=str)
# ----------
# Summarising Attributes
......@@ -315,6 +304,24 @@ class SDP(opcua_device):
# Commands
# --------
# A weight representing no scaling in FPGA_subband_weights_R(W)
SUBBAND_UNIT_WEIGHT = 2**13
@staticmethod
def _subband_weights(signal_delay_seconds: numpy.ndarray, clock: int, nyquist_zone: int) -> numpy.ndarray:
""" Compute the FPGA_subband_weights_RW rows for our antennas. """
subband_phases = numpy.zeros((len(signal_delay_seconds), 512), dtype=numpy.float64)
for subband_nr in range(512):
frequency = subband_frequency(subband_nr, clock, nyquist_zone)
# correct for the delay by rotating the signal *back*
subband_phases[:, subband_nr] = (-2.0 * numpy.pi) * frequency * signal_delay_seconds
# convert phases to their complex equivalent, as FPGA weights (cint16 packed into uint32)
return phases_to_weights(subband_phases, SDP.SUBBAND_UNIT_WEIGHT)
# ----------
# Run server
# ----------
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment