diff --git a/tangostationcontrol/tangostationcontrol/common/cables.py b/tangostationcontrol/tangostationcontrol/common/cables.py new file mode 100644 index 0000000000000000000000000000000000000000..5f8bed3d916d01a9c3623cc46aa9812ede5c9a7c --- /dev/null +++ b/tangostationcontrol/tangostationcontrol/common/cables.py @@ -0,0 +1,45 @@ +class CableType(object): + """ A cable used in LOFAR, with its properties. """ + + def __init__(name: str, length: int, delay: float, loss: dict): + self.name = name # for reverse lookups + self.length = length # metres + self.delay = delay # seconds + self.loss = loss # frequency_mhz -> dB + + def speed(self): + """ Return the speed of the signal in this cable, in m/s. """ + + return self.length / self.delay + + def get_loss(self, antenna_type: str, rcu_band_select: int) -> float: + """ Get the appropiate loss value (in dB), for the given + antenna type and RCU band selection. """ + + if antenna_type == "LBA": + return self.loss[50] + elif antenna_type == "HBA": + if rcu_band_select == 1: + return self.loss[150] + elif rcu_band_select == 2: + return self.loss[200] + elif rcu_band_select == 4: + return self.loss[250] + else: + raise ValueError(f"Unsupported RCU band selection for HBA: {rcu_band_select}") + + raise ValueError(f"Unsupported antenna type: {antenna_type}") + +# Global list of all known cable types. +# +# NB: The LOFAR1 equivalents of these tables are: +# - MAC/Deployment/data/StaticMetaData/CableDelays/ +# - MAC/Deployment/data/StaticMetaData/CableAttenuation.conf +cable_types = {} +cable_types[ "0m"] = CableType(name= "0m", length= 0, delay=000.0000e-9, loss={50: 0.00, 150: 0.00, 200: 0.00, 250: 0.00}) +cable_types[ "50m"] = CableType(name= "50m", length= 50, delay=199.2573e-9, loss={50: 2.05, 150: 3.64, 200: 4.24, 250: 4.46}) +cable_types[ "80m"] = CableType(name= "80m", length= 80, delay=326.9640e-9, loss={50: 3.32, 150: 5.87, 200: 6.82, 250: 7.19}) +cable_types[ "85m"] = CableType(name= "85m", length= 85, delay=342.5133e-9, loss={50: 3.53, 150: 6.22, 200: 7.21, 250: 7.58}) +cable_types["115m"] = CableType(name="115m", length=115, delay=465.5254e-9, loss={50: 4.74, 150: 8.35, 200: 9.70, 250: 10.18}) +cable_types["120m"] = CableType(name="120m", length=120, delay=493.8617e-9, loss={50: 4.85, 150: 8.55, 200: 9.92, 250: 10.42}) # used on CS030 +cable_types["130m"] = CableType(name="130m", length=130, delay=530.6981e-9, loss={50: 5.40, 150: 9.52, 200: 11.06, 250: 11.61}) diff --git a/tangostationcontrol/tangostationcontrol/devices/antennafield.py b/tangostationcontrol/tangostationcontrol/devices/antennafield.py index 2e76235e2151cd49d4bb5e03b2b5ffffae89714e..2f03b7e7f0eaac90ebae39d5eb439319073af88d 100644 --- a/tangostationcontrol/tangostationcontrol/devices/antennafield.py +++ b/tangostationcontrol/tangostationcontrol/devices/antennafield.py @@ -20,6 +20,7 @@ from tangostationcontrol.common.type_checking import type_not_sequence from tangostationcontrol.common.entrypoint import entry from tangostationcontrol.devices.lofar_device import lofar_device from tangostationcontrol.common.lofar_logging import device_logging_to_python, log_exceptions +from tangostationcontrol.common.cables import cable_types from tangostationcontrol.devices.device_decorators import fault_on_error from tangostationcontrol.beam.geo import ETRS_to_ITRF, ITRF_to_GEO, GEO_to_GEOHASH from tangostationcontrol.beam.hba_tile import HBATAntennaOffsets, NUMBER_OF_ELEMENTS_PER_TILE @@ -131,6 +132,13 @@ class AntennaField(lofar_device): default_value = numpy.array([False] * MAX_NUMBER_OF_HBAT) ) + Antenna_Cables = device_property( + doc=f"Which cables connect each antenna to the RCU. Both polarisations are assumed to be connected using the same type of cable. Needs to be any of ({', '.join(cable_types.keys())}).", + dtype='DevStringArray', + mandatory=False, + default_value = numpy.array(["0m"] * MAX_NUMBER_OF_HBAT) + ) + # ----- Position information Antenna_Field_Reference_ITRF = device_property( @@ -170,6 +178,7 @@ class AntennaField(lofar_device): mandatory=False, default_value = 2015.5 ) + HBAT_PQR_rotation_angles_deg = device_property( doc='Rotation of each tile in the PQ plane ("horizontal") in degrees.', dtype='DevVarFloatArray', @@ -225,6 +234,10 @@ class AntennaField(lofar_device): default_value = [-1] * MAX_NUMBER_OF_HBAT * 2 ) + # the X and Y signals traverse over the power and control lines, respectively + X_to_RECV_mapping = Power_to_RECV_mapping + Y_to_RECV_mapping = Control_to_RECV_mapping + RECV_devices = device_property( dtype=(str,), doc='Which Recv devices are in use by the AntennaField. The order is important and it should match up with the order of the mapping.', @@ -249,6 +262,13 @@ class AntennaField(lofar_device): Antenna_Usage_Mask_R = attribute(doc='Whether each antenna will be used.', dtype=(bool,), max_dim_x=MAX_NUMBER_OF_HBAT) + Antenna_Cables_R = attribute(doc=f"Which cables connect each antenna to the RCU. Both polarisations are assumed to be connected using the same type of cable. Needs to be any of ({', '.join(cable_types.keys())}).", + dtype=(str,), max_dim_x=MAX_NUMBER_OF_HBAT) + Antenna_Cables_Length_R = attribute(doc=f"Length of the cable between antenna and RCU, in metres.", + dtype=(numpy.uint32,), max_dim_x=MAX_NUMBER_OF_HBAT, unit="m") + Antenna_Cables_Delay_R = attribute(doc=f"Delay caused by the cable between antenna and RCU, in seconds.", + dtype=(numpy.float64,), max_dim_x=MAX_NUMBER_OF_HBAT, unit="s") + Antenna_to_SDP_Mapping_R = attribute(doc='To which (fpga, input) pair each antenna is connected. -1=unconnected.', dtype=((numpy.int32,),), max_dim_x=2, max_dim_y=MAX_NUMBER_OF_HBAT) @@ -263,6 +283,7 @@ class AntennaField(lofar_device): HBAT_PWR_LNA_on_RW = mapped_attribute("HBAT_PWR_LNA_on_RW", dtype=((bool,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2, max_dim_y=MAX_NUMBER_OF_HBAT, access=AttrWriteType.READ_WRITE) HBAT_PWR_on_R = mapped_attribute("HBAT_PWR_on_R", dtype=((bool,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2, max_dim_y=MAX_NUMBER_OF_HBAT) HBAT_PWR_on_RW = mapped_attribute("HBAT_PWR_on_RW", dtype=((bool,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2, max_dim_y=MAX_NUMBER_OF_HBAT, access=AttrWriteType.READ_WRITE) + RCU_attenuator_dB_RW = mapped_attribute("RCU_attenuator_db_RW", dtype=(numpy.int64,), max_dim_x=MAX_NUMBER_OF_HBAT, access=AttrWriteType.READ_WRITE) RCU_band_select_RW = mapped_attribute("RCU_band_select_RW", dtype=(numpy.int64,), max_dim_x=MAX_NUMBER_OF_HBAT, access=AttrWriteType.READ_WRITE) # ----- Position information @@ -329,6 +350,15 @@ class AntennaField(lofar_device): def read_Antenna_to_SDP_Mapping_R(self): return numpy.array(self.Antenna_to_SDP_Mapping).reshape(-1,2) + def read_Antenna_Cables_R(self): + return self.Antenna_Cables + + def read_Antenna_Cables_Length_R(self): + return [cables.cable_types[antenna].length for antenna in self.Antenna_Cables] + + def read_Antenna_Cables_Delay_R(self): + return [cables.cable_types[antenna].delay for antenna in self.Antenna_Cables] + def read_nr_antennas_R(self): # The number of antennas should be equal to: # * the number of elements in the Control_to_RECV_mapping (after reshaping), @@ -336,7 +366,8 @@ class AntennaField(lofar_device): # * the number of antennas exposed through Antenna_Reference_ITRF_R. # * the number of elements in Antenna_Use # * the number of elements in Antenna_Quality - # * the number of elements in Antenna_to_SDP_Mapping + # * the number of elements as pairs in Antenna_to_SDP_Mapping + # * the number of elements as pairs in Antenna_Cables # # Parsing a property here is quickest, so we chose that. return len(self.Control_to_RECV_mapping) // 2 @@ -475,31 +506,95 @@ class AntennaField(lofar_device): # Turn on power to antennas that need it (and due to the ANT_mask, that we're using) self.proxy.write_attribute('RCU_PWR_ANT_on_RW', self.Antenna_Needs_Power) + self.calibrate_recv() + # ----------------------------------------------------------- # Configure SDP # ----------------------------------------------------------- self.configure_sdp() + self.calibrate_sdp() # -------- # Commands # -------- + + @command() + def calibrate_recv(self): + # ----------------------------------------------------------- + # Tune the RCU attenuation to compensate for differences + # in cable loss. + # ----------------------------------------------------------- + + cable_names = self.read_attribute("Antenna_Cables_R") + rcu_bands = self.read_attribute("RCU_band_select_RW") + + cable_losses = numpy.array([cable_types[name].get_loss(self.Antenna_Type, rcu_bands[antenna_nr]) for antenna_nr, name in enumerate(cable_names)]) + max_loss = max(cable_losses) + + self.write_attribute("RCU_attenuator_db_RW", max_loss - cable_losses) + @command() def configure_sdp(self): """ Configure SDP to process our antennas. """ - # upload which antenna type we're using + # Mapping [antenna] -> [fpga][input] + antenna_to_sdp_mapping = self.read_attribute("Antenna_to_SDP_Mapping_R") + + # ----------------------------------------------------------- + # Upload which antenna type we're using + # ----------------------------------------------------------- # read-modify-write on [fpga][(input, polarisation)] sdp_antenna_type = self.sdp_proxy.antenna_type_RW - for fpga_nr, input_nr in self.read_attribute("Antenna_to_SDP_Mapping_R"): + for fpga_nr, input_nr in antenna_to_sdp_mapping: # set for x polarisation sdp_antenna_type[fpga_nr, input_nr * 2 + 0] = self.Antenna_Type # set for y polarisation sdp_antenna_type[fpga_nr, input_nr * 2 + 1] = self.Antenna_Type - self.sdp_proxy.antenna_type_RW = sdp_antenna_type + @command() + def calibrate_sdp(self): + """ Calibrate SDP to process our antennas. """ + + clock = self.sdp_proxy.read_attribute("clock_RW") + + # Mapping [antenna] -> [fpga][input] + antenna_to_sdp_mapping = self.read_attribute("Antenna_to_SDP_Mapping_R") + + # ----------------------------------------------------------- + # Set signal-input delays to compensate for differences + # in cable length. + # ----------------------------------------------------------- + + # The delay to correct for, [antenna] (we assume the same + # delay for both X and Y). + cable_names = self.read_attribute("Antenna_Cables_R") + + # delay for each cable, in seconds + signal_delay_seconds = numpy.array([cable_types[name].delay for name in cable_names]) + + # convert to a delay in samples (200 MHz clock == 5ns samples) + signal_delay_samples = numpy.round(signal_delay_seconds * clock).astype(numpy.uint32) + + # correct for the coarse delay by delaying the other signals to line up + # we cannot configure a negative number of samples, so we must delay + # all of them sufficiently as well. + # + # This introduces a constant shift in timing for all samples, + # as we shift all of them to obtain a non-negative delay. + input_samples_delay = max(signal_delay_samples) - signal_delay_samples + + # read-modify-write on [fpga][(input, polarisation)] + fpga_signal_input_samples_delay = self.sdp_proxy.FPGA_signal_input_samples_delay_RW + for antenna_nr, (fpga_nr, input_nr) in enumerate(antenna_to_sdp_mapping): + # set for X polarisation + fpga_signal_input_samples_delay[fpga_nr, input_nr * 2 + 0] = input_samples_delay[antenna_nr] + # set for Y polarisation + fpga_signal_input_samples_delay[fpga_nr, input_nr * 2 + 1] = input_samples_delay[antenna_nr] + self.sdp_proxy.FPGA_signal_input_samples_delay_RW = fpga_signal_input_samples_delay + @command(dtype_in=DevVarFloatArray, dtype_out=DevVarLongArray) def calculate_HBAT_bf_delay_steps(self, delays: numpy.ndarray): num_tiles = self.read_nr_antennas_R() @@ -557,7 +652,8 @@ class AntennaToRecvMapper(object): "HBAT_PWR_LNA_on_RW": value_map_ant_32_bool, "HBAT_PWR_on_R": value_map_ant_32_bool, "HBAT_PWR_on_RW": value_map_ant_32_bool, - "RCU_band_select_RW": numpy.zeros(number_of_antennas, dtype=numpy.int64) + "RCU_band_select_RW": numpy.zeros(number_of_antennas, dtype=numpy.int64), + "RCU_attenuator_dB_RW": numpy.zeros(number_of_antennas, dtype=numpy.int64) } self._masked_value_mapping_write = { "ANT_mask_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96, @@ -567,6 +663,7 @@ class AntennaToRecvMapper(object): "HBAT_PWR_LNA_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96_32, "HBAT_PWR_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96_32, "RCU_band_select_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96, + "RCU_attenuator_dB_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96, } self._reshape_attributes_in = { "HBAT_BF_delay_steps_RW": (96, 32), diff --git a/tangostationcontrol/tangostationcontrol/devices/boot.py b/tangostationcontrol/tangostationcontrol/devices/boot.py index 42ce74d74c7efc6ca27d4b4ef8a53988441fd04a..165032313f9a00657d0fc6a9fed73f94c2beb4ba 100644 --- a/tangostationcontrol/tangostationcontrol/devices/boot.py +++ b/tangostationcontrol/tangostationcontrol/devices/boot.py @@ -247,7 +247,7 @@ class Boot(lofar_device): "STAT/SST/1", "STAT/XST/1", "STAT/Beamlet/1", - "STAT/AntennaField/1", # Accesses RECV + "STAT/AntennaField/1", # Accesses RECV and SDP "STAT/TileBeam/1", # Accesses AntennaField "STAT/DigitalBeam/1", # Accessed SDP and Beamlet "STAT/TemperatureManager/1", diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/beamlet.py b/tangostationcontrol/tangostationcontrol/devices/sdp/beamlet.py index 24dda0332e8711ffd5b67226f4a7d4b1bdc1e2d3..c0307f8d8446ead2b771bbc9597efd76341604fb 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/beamlet.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/beamlet.py @@ -17,6 +17,7 @@ from tangostationcontrol.common.lofar_logging import log_exceptions from tangostationcontrol.clients.attribute_wrapper import attribute_wrapper from tangostationcontrol.devices.opcua_device import opcua_device from tangostationcontrol.devices.sdp.sdp import SDP +from tangostationcontrol.devices.sdp.common import nyquist_zone, phases_to_weights, subband_frequencies import numpy from functools import lru_cache @@ -209,8 +210,8 @@ class Beamlet(opcua_device): # subscribe to events to notice setting changes in SDP that determine the input frequency self.event_subscriptions = {} - self.event_subscriptions["clock_rw"] = self.sdp_proxy.subscribe_event("clock_RW", EventType.CHANGE_EVENT, self._frequency_change_event, stateless=True) - self.event_subscriptions["nyquist_zone_r"] = self.sdp_proxy.subscribe_event("nyquist_zone_R", EventType.CHANGE_EVENT, self._frequency_change_event, stateless=True) + self.event_subscriptions["clock_RW"] = self.sdp_proxy.subscribe_event("clock_RW", EventType.CHANGE_EVENT, self._frequency_change_event, stateless=True) + self.event_subscriptions["antenna_types_RW"] = self.sdp_proxy.subscribe_event("antenna_types_RW", EventType.CHANGE_EVENT, self._frequency_change_event, stateless=True) def configure_for_off(self): super().configure_for_off() @@ -252,7 +253,8 @@ class Beamlet(opcua_device): where 'frequency' is the subband frequency: LBA: frequency = (subband_nr + 0) * clock / 1024 - HBA: frequency = (subband_nr + 512) * clock / 1024 + HBA 160 MHz: frequency = (subband_nr + 512) * clock / 1024 + HBA 200 MHz: frequency = (subband_nr + 1024) * clock / 1024 The beamformer combines a set of antennas for each beamlet, and each beamlet can have a different pointing and subband selected. @@ -263,45 +265,6 @@ class Beamlet(opcua_device): The phases, delays, and final beam weights, all have shape (fpga_nr, [input_nr][pol][beamlet_nr]). """ - BF_UNIT_WEIGHT = 2**14 - - @staticmethod - def _phases_to_bf_weights(phases: numpy.ndarray): - """ Convert differences in phase (in radians) into FPGA weights (complex numbers packed into uint32). """ - - # flatten array and restore its shape later, which makes running over all elements a lot easier - orig_shape = phases.shape - phases = phases.flatten() - - # Convert array values in complex numbers - real = Beamlet.BF_UNIT_WEIGHT * numpy.cos(phases) - imag = Beamlet.BF_UNIT_WEIGHT * numpy.sin(phases) - - # Interleave into (real, imag) pairs, and store as int16 - # see also https://stackoverflow.com/questions/5347065/interweaving-two-numpy-arrays/5347492 - # Round to nearest integer instead of rounding down - real_imag = numpy.empty(phases.size * 2, dtype=numpy.int16) - real_imag[0::2] = numpy.round(real) - real_imag[1::2] = numpy.round(imag) - - # Cast each (real, imag) pair into an uint32, which brings the array size - # back to the original. - bf_weights = real_imag.view(numpy.uint32) - - return bf_weights.reshape(orig_shape) - - @staticmethod - def _subband_frequencies(subbands: numpy.ndarray, clock: int, nyquist_zone: int) -> numpy.ndarray: - """ Obtain the frequencies of each subband, given a clock and an antenna type. """ - - subband_width = clock / 1024 - base_subband = nyquist_zone * 512 - - # broadcast clock across frequencies - frequencies = (subbands + base_subband) * subband_width - - return frequencies - @lru_cache() # this function requires large hardware reads for values that don't change often def _beamlet_frequencies(self): """ Obtain the frequencies (in Hz) of each subband that is selected for each input and beamlet. @@ -311,7 +274,13 @@ class Beamlet(opcua_device): # obtain which subband is selected for each input and beamlet beamlet_subbands = self.read_attribute("FPGA_beamlet_subband_select_RW") # (fpga_nr, [input_nr][pol][beamlet_nr]) - return self._subband_frequencies(beamlet_subbands, self.sdp_proxy.clock_RW, self.sdp_proxy.nyquist_zone_R) + + # obtain which nyquist zones are used on the FPGAs + nyquist_zones = nyquist_zone(self.sdp_proxy.antenna_types_RW, self.sdp_proxy.clock_RW) + + return subband_frequencies(beamlet_subbands, self.sdp_proxy.clock_RW, nyquist_zones) + + BF_UNIT_WEIGHT = 2**14 @staticmethod def _calculate_bf_weights(delays: numpy.ndarray, beamlet_frequencies: numpy.ndarray): @@ -326,7 +295,7 @@ class Beamlet(opcua_device): beamlet_phases = (-2.0 * numpy.pi) * beamlet_frequencies * delays # convert to weights - bf_weights = Beamlet._phases_to_bf_weights(beamlet_phases) + bf_weights = phases_to_weights(beamlet_phases, Beamlet.BF_UNIT_WEIGHT) return bf_weights diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/common.py b/tangostationcontrol/tangostationcontrol/devices/sdp/common.py new file mode 100644 index 0000000000000000000000000000000000000000..ab45f0cbcbf8c7595d6d6e1c0efd01313f1ee46e --- /dev/null +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/common.py @@ -0,0 +1,58 @@ +import numpy + +def nyquist_zone(self, clock: float, antenna_type: str) -> int: + """ Return the Nyquist zone for the given clock (in Hz), + and antenna type ("LBA" or "HBA"). + + The Nyquist zone determines the frequency offset of + the antennas. + + NOTE: Only 160 and 200 MHz clocks are supported. """ + + # (AntennaType, clockMHz) -> Nyquist zone + nyquist_zones = { + ("LBA", 160): 0, + ("LBA", 200): 0, + ("HBA", 160): 1, + ("HBA", 200): 2, + } + + try: + return nyquist_zones[(self.antenna_type, clock // 1000000)] + except KeyError: + raise ValueError(f"Could not determine Nyquist zone for antenna type {self.AntennaType} with clock {clock} Hz") + +def subband_frequency(subband: int, clock: float, nyquist_zone: int) -> int: + """ Obtain the frequencies of a subband, given a clock and an antenna type. """ + subband_width = clock / 1024 + return (512 * nyquist_zone + subband) * subband_width + +def subband_frequencies(subbands: numpy.ndarray, clock: float, nyquist_zone: int) -> numpy.ndarray: + """ Obtain the frequencies of each subband, given a clock and an antenna type. """ + return subband_frequency(subbands, clock, nyquist_zone) + +def phases_to_weights(phases: numpy.ndarray, unit_weight: float) -> numpy.ndarray: + """ Convert differences in phase (in radians) into FPGA weights (complex numbers packed into uint32). """ + + # The FPGA accepts weights as a 16-bit (imag,real) complex pair packed into an uint32. + + # flatten array and restore its shape later, which makes running over all elements a lot easier + orig_shape = phases.shape + phases = phases.flatten() + + # Convert array values in complex numbers + real = unit_weight * numpy.cos(phases) + imag = unit_weight * numpy.sin(phases) + + # Interleave into (real, imag) pairs, and store as int16 + # see also https://stackoverflow.com/questions/5347065/interweaving-two-numpy-arrays/5347492 + # Round to nearest integer instead of rounding down + real_imag = numpy.empty(phases.size * 2, dtype=numpy.int16) + real_imag[0::2] = numpy.round(real) + real_imag[1::2] = numpy.round(imag) + + # Cast each (real, imag) pair into an uint32, which brings the array size + # back to the original. + weights = real_imag.view(numpy.uint32) + + return weights.reshape(orig_shape) diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/sdp.py b/tangostationcontrol/tangostationcontrol/devices/sdp/sdp.py index d897a36168756f44c919e2b113ab1fd4e659c28b..9d84737f8e50986d33dd02ffc9652cb91214d7e3 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/sdp.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/sdp.py @@ -12,14 +12,15 @@ """ # PyTango imports -from tango.server import device_property, attribute -from tango import AttrWriteType +from tango.server import device_property, attribute, command +from tango import AttrWriteType, DeviceProxy # Additional import from tangostationcontrol.clients.attribute_wrapper import attribute_wrapper from tangostationcontrol.devices.opcua_device import opcua_device from tangostationcontrol.common.entrypoint import entry from tangostationcontrol.common.lofar_logging import device_logging_to_python +from tangostationcontrol.devices.sdp.common import nyquist_zone, phases_to_weights, subband_frequency import numpy @@ -182,9 +183,6 @@ class SDP(opcua_device): antenna_type_RW = attribute(doc='Type of antenna (LBA or HBA) attached to each input of the FPGAs', dtype=(str,), max_dim_y=N_pn, max_dim_x=S_pn, access=AttrWriteType.READ_WRITE, fisallowed="is_attribute_access_allowed") - nyquist_zone_R = attribute(doc='Nyquist zone of the input frequencies', - dtype=numpy.uint32, fisallowed="is_attribute_access_allowed", - polling_period=1000, abs_change=1) clock_RW = attribute(doc='Configured sampling clock (Hz)', dtype=numpy.uint32, access=AttrWriteType.READ_WRITE, fisallowed="is_attribute_access_allowed", polling_period=1000, abs_change=1) @@ -202,34 +200,8 @@ class SDP(opcua_device): self._antenna_type = value - def _nyquist_zone(self, clock): - """ Return the Nyquist zone for the given clock (in Hz). - - The Nyquist zone determines the frequency offset of - the antennas. - - NOTE: Only 160 and 200 MHz clocks are supported. """ - - # (AntennaType, clockMHz) -> Nyquist zone - nyquist_zones = { - ("LBA", 160): 0, - ("LBA", 200): 0, - ("HBA", 160): 1, - ("HBA", 200): 2, - } - - try: - # support only one AntennaType for now. TODO: expose nyquist zones as an array - return nyquist_zones[(self._antenna_type[0][0], clock // 1000000)] - except KeyError: - raise ValueError(f"Could not determine Nyquist zone for antenna type {self.AntennaType} with clock {clock} Hz") - - def read_nyquist_zone_R(self): - try: - return self._nyquist_zone(self.read_attribute("clock_RW")) - except ValueError: - # Supply a sane default for computations in tests until L2SDP-725 allows us to read back the set clock - return 0 + # This changes the Nyquist zone + self.update_nyquist_zones() def read_clock_RW(self): # We can only return a single value, so we assume the FPGA is configured coherently. Which is something @@ -248,8 +220,25 @@ class SDP(opcua_device): # Tell all FPGAs to use this clock self.proxy.FPGA_pps_expected_cnt_RW = [clock] * self.N_pn - # Also update the packet headers - self.proxy.FPGA_sdp_info_nyquist_sampling_zone_index_RW = [self._nyquist_zone(clock)] * self.N_pn + # This changes the Nyquist zone + self.update_nyquist_zones() + + def update_nyquist_zones(self): + """ Reconfigure everything that depends on the Nyquist zone of an input. """ + + # derive the nyquist zones per input + clock = self.read_attribute("clock_RW") + antenna_to_nyquist_zone = numpy.vectorize(lambda antenna_type: nyquist_zone(antenna_type, clock)) + nyquist_zones_per_polarised_input = antenna_to_nyquist_zone(self._antenna_type) + + # derive the nyquist zone per FPGA. We use the first input per FPGA to be representative. + nyquist_zones_per_fpga = nyquist_zones_per_polarised_input[:,0] + + # update the packet headers + self.proxy.FPGA_sdp_info_nyquist_sampling_zone_index_RW = nyquist_zones_per_fpga + + @command(doc_in='Apply calibration and configuration to process the given AntennaField. NB: This can be performed for multiple AntennaFields, as long as their Antenna_to_SDP_Mappings do not clash.', + dtype_in=str) # ---------- # Summarising Attributes @@ -315,6 +304,24 @@ class SDP(opcua_device): # Commands # -------- + # A weight representing no scaling in FPGA_subband_weights_R(W) + SUBBAND_UNIT_WEIGHT = 2**13 + + @staticmethod + def _subband_weights(signal_delay_seconds: numpy.ndarray, clock: int, nyquist_zone: int) -> numpy.ndarray: + """ Compute the FPGA_subband_weights_RW rows for our antennas. """ + + subband_phases = numpy.zeros((len(signal_delay_seconds), 512), dtype=numpy.float64) + + for subband_nr in range(512): + frequency = subband_frequency(subband_nr, clock, nyquist_zone) + + # correct for the delay by rotating the signal *back* + subband_phases[:, subband_nr] = (-2.0 * numpy.pi) * frequency * signal_delay_seconds + + # convert phases to their complex equivalent, as FPGA weights (cint16 packed into uint32) + return phases_to_weights(subband_phases, SDP.SUBBAND_UNIT_WEIGHT) + # ---------- # Run server # ----------