diff --git a/docker-compose/device-sdp.yml b/docker-compose/device-sdp.yml index 0e54304ed6ac5313390020ef985e051a2effc42d..653efb2ad352e63936f75dbfda758b3406dadb06 100644 --- a/docker-compose/device-sdp.yml +++ b/docker-compose/device-sdp.yml @@ -43,5 +43,5 @@ services: - bin/start-ds.sh # configure CORBA to _listen_ on 0:port, but tell others we're _reachable_ through ${HOSTNAME}:port, since CORBA # can't know about our Docker port forwarding - - l2ss-sdp SDP STAT -v -ORBendPoint giop:tcp:0:5701 -ORBendPointPublish giop:tcp:${HOSTNAME}:5701 + - l2ss-sdp SDP STAT -v -ORBendPoint giop:tcp:device-sdp:5701 -ORBendPointPublish giop:tcp:${HOSTNAME}:5701 restart: unless-stopped diff --git a/tangostationcontrol/tangostationcontrol/devices/beam_device.py b/tangostationcontrol/tangostationcontrol/devices/beam_device.py index 0b436d89b2191b33f059094a89740d66382d927b..e69a10c3b584a0275f364aa1c92eccb2aee651e3 100644 --- a/tangostationcontrol/tangostationcontrol/devices/beam_device.py +++ b/tangostationcontrol/tangostationcontrol/devices/beam_device.py @@ -6,28 +6,255 @@ """Beam Abstract Device Server for LOFAR2.0 """ + +import datetime +import numpy +from json import loads +from threading import Thread, Lock, Condition + # PyTango imports -from tango.server import attribute, command -from tango import AttrWriteType, DebugIt +from tango.server import attribute, command, device_property +from tango import AttrWriteType, DebugIt, DevVarStringArray, DevVarDoubleArray, DevString # Additional import from tangostationcontrol.common.entrypoint import entry from tangostationcontrol.common.measures import get_measures_directory, get_available_measures_directories, download_measures, use_measures_directory, restart_python from tangostationcontrol.common.lofar_logging import log_exceptions +from tangostationcontrol.devices.device_decorators import TimeIt, only_in_states, fault_on_error +from tangostationcontrol.beam.delays import delay_calculator from tangostationcontrol.devices.lofar_device import lofar_device -__all__ = ["beam_device", "main"] +__all__ = ["beam_device", "main", "BeamTracker"] import logging logger = logging.getLogger() class beam_device(lofar_device): + # ----------------- + # Device Properties + # ----------------- + + Beam_tracking_interval = device_property( + dtype='DevFloat', + doc='Beam weights updating interval time [seconds]', + mandatory=False, + default_value = 10.0 + ) + + Beam_tracking_preparation_period = device_property( + dtype='DevFloat', + doc='Preparation time [seconds] needed before starting update operation', + mandatory=False, + default_value = 0.25 + ) # ---------- # Attributes # ---------- + # Maximum array size to allocate for the pointings, + # which must be enough for all derived classes. + # + # The actual number of pointings for this device + # will be stored as self._num_pointings. + MAX_POINTINGS = 1024 + + Pointing_direction_R = attribute(access=AttrWriteType.READ, + dtype=((numpy.str,),), max_dim_x=3, max_dim_y=MAX_POINTINGS, + fget=lambda self: self._pointing_direction_r) + + Pointing_direction_RW = attribute(access=AttrWriteType.READ_WRITE, + dtype=((numpy.str,),), max_dim_x=3, max_dim_y=MAX_POINTINGS, + fget=lambda self: self._pointing_direction_rw) + + Pointing_direction_str_R = attribute(access=AttrWriteType.READ, + doc='Pointing direction as a formatted string', + dtype=(numpy.str,), max_dim_x=MAX_POINTINGS, + fget=lambda self: ["{0} ({1}, {2})".format(*x) for x in self._pointing_direction_r]) + + Pointing_timestamp_R = attribute(access=AttrWriteType.READ, + dtype=(numpy.double,), max_dim_x=MAX_POINTINGS, + fget=lambda self: self._pointing_timestamp_r) + + Tracking_enabled_R = attribute(access=AttrWriteType.READ, + doc="Whether the tile beam is updated periodically", + dtype=numpy.bool, + fget=lambda self: self.Beam_tracker and self.Beam_tracker.is_alive()) + + Tracking_enabled_RW = attribute(access=AttrWriteType.READ_WRITE, + doc="Whether the tile beam should be updated periodically", + dtype=numpy.bool, + fget=lambda self: self._tracking_enabled_rw) + + Duration_update_pointing_R = attribute(access=AttrWriteType.READ, + dtype=numpy.float, fget=lambda self: self.update_pointing.statistics["last"] or 0) + + def write_Pointing_direction_RW(self, value): + """ Setter method for attribute Pointing_direction_RW """ + # verify whether values are valid + if len(value) != self._num_pointings: + raise ValueError(f"Expected {self._num_pointings} directions, got {len(value)}") + + for pointing in value: + if not self.generic_delay_calculator.is_valid_direction(pointing): + raise ValueError(f"Invalid direction: {pointing}") + + # store the new values + self._pointing_direction_rw = value + + # force update across tiles if pointing changes + self.Beam_tracker.force_update() + logger.info("Pointing direction update requested") + + def write_Tracking_enabled_RW(self, value): + self._tracking_enabled_rw = value + + if value: + self.Beam_tracker.start() + else: + self.Beam_tracker.stop() + + + @TimeIt() + def update_pointing(self, timestamp: datetime.datetime): + """ Update the weights for the configured pointings, for the given timestamp. """ + self._set_pointing(self._pointing_direction_rw, timestamp) + + # -------- + # abstract interface + # -------- + + def _delays(self, pointing_direction: numpy.array, timestamp: datetime.datetime) -> numpy.array: + """ + Calculate the delay values based on the 2D pointing list (num_pointings x 3) and the timestamp + """ + + raise NotImplementedError + + def _set_pointing(self, pointing_direction: numpy.array, timestamp: datetime.datetime): + """ + Calculate and Upload the hardware-specific delay weights based on the 2D pointing list (num_pointings x 3) and the timestamp + + Also updates _pointing_direction_r and _pointing_timestamp_r according to which anetennas got their pointing updated. + """ + + raise NotImplementedError + + # -------- + # overloaded functions + # -------- + + def init_device(self): + super().init_device() + + # Initialise pointing array data and attribute + self._tracking_enabled_rw = False + + # thread to perform beam tracking + self.Beam_tracker = None + + # generic delay calculator to ask about validity of settings + self.generic_delay_calculator = delay_calculator([0, 0, 0]) + + # Derived classes will override this with a non-parameterised + # version that lofar_device will call. + @log_exceptions() + def configure_for_initialise(self, num_pointings): + super().configure_for_initialise() + + if not (0 < num_pointings <= self.MAX_POINTINGS): + raise ValueError(f"beam_device is configured to support 0 - {self.MAX_POINTINGS} pointings, but {num_pointings} were requested") + + # Initialise tracking control + self._num_pointings = num_pointings + self._pointing_timestamp_r = numpy.zeros(num_pointings, dtype=numpy.double) + self._pointing_direction_r = numpy.zeros((num_pointings, 3), dtype="<U32") + self._pointing_direction_rw = numpy.array([["AZELGEO","0deg","90deg"]] * num_pointings, dtype="<U32") + self._tracking_enabled_rw = True + + # Create a thread object to update beam weights + self.Beam_tracker = BeamTracker(self.Beam_tracking_interval, self.Beam_tracking_preparation_period, self.update_pointing, self.Fault) + + @log_exceptions() + def configure_for_on(self): + super().configure_for_on() + + # Start beam tracking thread + if self._tracking_enabled_rw: + self.Beam_tracker.start() + + @log_exceptions() + def configure_for_off(self): + if self.Beam_tracker: + # Stop thread object + self.Beam_tracker.stop() + + super().configure_for_off() + + + # -------- + # Commands + # -------- + + @command(dtype_in=DevVarStringArray) + @DebugIt() + @log_exceptions() + @only_in_states(lofar_device.DEFAULT_COMMAND_STATES) + def set_pointing(self, pointing_direction: list): + """ + Compute and uploads the hardware-specific delays based on a given pointing direction + 2D array (num_pointings x 3 parameters). + """ + + # Reshape the flatten input array + pointing_direction = numpy.array(pointing_direction).reshape(self._num_pointings, 3) + + self._set_pointing(pointing_direction, datetime.datetime.now()) + + @command(dtype_in = DevString) + @DebugIt() + @only_in_states(lofar_device.DEFAULT_COMMAND_STATES) + def set_pointing_for_specific_time(self, parameters: DevString = None): + """ + Compute and uploads the hardware-specific delays based on a given pointing direction + 2D array (num_pointings x 3 parameters) for the given timestamp + + Parameters are given in a json document. + pointing_direction: array of 96 casacore pointings (3 string parameters) + timestamp: POSIX timestamp + """ + pointing_parameters = loads(parameters) + + pointing_direction = list(pointing_parameters["pointing_direction"]) + timestamp_param = float(pointing_parameters["timestamp"]) + timestamp = datetime.datetime.fromtimestamp(timestamp_param) + + # Reshape the flatten pointing array + pointing_direction = numpy.array(pointing_direction).reshape(self._num_pointings, 3) + + self._set_pointing(pointing_direction, timestamp) + + + @command(dtype_in=DevVarStringArray, dtype_out=DevVarDoubleArray) + @DebugIt() + @log_exceptions() + @only_in_states(lofar_device.DEFAULT_COMMAND_STATES) + def delays(self, pointing_direction: numpy.array): + """ + Calculate the delays based on the pointing list and the timestamp + """ + + pointing_direction = numpy.array(pointing_direction).reshape(self._num_pointings, 3) + + delays = self._delays(pointing_direction, datetime.datetime.now()) + + return delays.flatten() + + # -------- + # Measures + # -------- + # Directory where the casacore measures that we use, reside. We configure ~/.casarc to # use the symlink /opt/IERS/current, which we switch to the actual set of files to use. measures_directory_R = attribute(dtype=str, access=AttrWriteType.READ, fget = lambda self: get_measures_directory()) @@ -35,10 +262,6 @@ class beam_device(lofar_device): # List of dowloaded measures (the latest 64, anyway) measures_directories_available_R = attribute(dtype=(str,), max_dim_x=64, access=AttrWriteType.READ, fget = lambda self: sorted(get_available_measures_directories())[-64:]) - # -------- - # Commands - # -------- - @command(dtype_out=str, doc_out="Name of newly installed measures directory") @DebugIt() @log_exceptions() @@ -76,3 +299,108 @@ class beam_device(lofar_device): def main(**kwargs): """Main function of the Docker module.""" return entry(beam_device, **kwargs) + +# ---------- +# Beam Tracker +# ---------- + +class BeamTracker(): + + DISCONNECT_TIMEOUT = 3.0 + + """ Object that encapsulates a Thread, resposible for beam tracking operations """ + def __init__(self, interval, preparation_period, update_pointing_callback, fault_callback): + self.thread = None + self.interval = interval + self.preparation_period = preparation_period + self.update_pointing_callback = update_pointing_callback + self.fault_callback = fault_callback + + # Condition to trigger a forced update or early abort + self.update_lock = Lock() + self.update_condition = Condition(self.update_lock) + + # Whether the pointing has to be forced updated + self.stale_pointing = True + + def start(self): + """ Starts the Beam Tracking thread """ + if self.thread: + # already started + return + + self.done = False + self.thread = Thread(target=self._update_pointing_direction, name="BeamTracker") + self.thread.start() + + logger.info("BeamTracking thread started") + + def is_alive(self): + """ Returns True just before the Thread run() method starts until just after the Thread run() method terminates. """ + return self.thread and self.thread.is_alive() + + def force_update(self): + """ Force the pointing to be updated. """ + + self.stale_pointing = True + self.notify_thread() + + def notify_thread(self): + # inform the thread to stop waiting + with self.update_lock: + self.update_condition.notify() + + def stop(self): + """ Stops the Beam Tracking loop """ + + if not self.thread: + return + + logger.info("BeamTracking thread stopping") + + self.done = True + self.force_update() + + # wait for thread to finish + self.thread.join(self.DISCONNECT_TIMEOUT) + + if self.is_alive(): + logger.error("BeamTracking Thread did not properly terminate") + + self.thread = None + + logger.info("BeamTracking thread stopped") + + def _get_sleep_time(self): + """ Computes the sleep time (in seconds) that needs to be waited for the next beam tracking update """ + now = datetime.datetime.now().timestamp() + + # Computes the left seconds before the next update + next_update_in = self.interval - (now % self.interval) + + # Computes the needed sleep time before the next update + sleep_time = next_update_in - self.preparation_period + # If sleep time is negative, add the tracking interval for the next update + if sleep_time < 0: + return sleep_time + self.interval + else: + return sleep_time + + # @fault_on_error routes errors here. we forward them to our device + def Fault(self, msg): + self.fault_callback(msg) + + @log_exceptions() + @fault_on_error() + def _update_pointing_direction(self): + """ Updates the beam weights using a fixed interval of time """ + + # Check if flag beamtracking is true + with self.update_lock: + while not self.done: + self.stale_pointing = False + self.update_pointing_callback(datetime.datetime.now()) + + # sleep until the next update, or when interrupted (this releases the lock, allowing for notification) + # note that we need wait_for as conditions can be triggered multiple times in succession + self.update_condition.wait_for(lambda: self.done or self.stale_pointing, self._get_sleep_time()) diff --git a/tangostationcontrol/tangostationcontrol/devices/device_decorators.py b/tangostationcontrol/tangostationcontrol/devices/device_decorators.py index 26a5c488ba0a4ae219e3bd783691491e9ff2f45c..b1dbb4b7a5b61bf80169e8945b7347f9d2e394fb 100644 --- a/tangostationcontrol/tangostationcontrol/devices/device_decorators.py +++ b/tangostationcontrol/tangostationcontrol/devices/device_decorators.py @@ -2,9 +2,10 @@ from tango import DevState from functools import wraps import logging +import time logger = logging.getLogger() -__all__ = ["only_in_states", "only_when_on", "fault_on_error"] +__all__ = ["only_in_states", "only_when_on", "fault_on_error", "TimeIt"] def only_in_states(allowed_states, log=True): """ @@ -63,3 +64,51 @@ def fault_on_error(): return error_wrapper return inner + + +def TimeIt(log_function=None): + """ + Wrapper to time calls. Stores the timing log in a + <function>.statistics property as a dict with the following + information: + + "count": number of times the function was called + "last": duration of the last invocation + "history": last 10 durations + + NOTE: If the function called throws an exception, timing information + is not logged or recorded. Those calls are expected to be + uncharacteristically cheap, after all. + """ + + def inner(func): + statistics = { + "count": 0, + "last": None, + "history": [], + } + + @wraps(func) + def timer_wrapper(self, *args, **kwargs): + # time function call + before = time.monotonic_ns() + result = func(self, *args, **kwargs) + after = time.monotonic_ns() + + # store measurement + statistics["count"] += 1 + statistics["last"] = (after - before) / 1e9 + statistics["history"] = statistics["history"][-9:] + [statistics["last"]] + + if log_function: + log_function(f"Call to {func.__name__} took {(after - before)/1e6} ms") + + # return function result (if any) + return result + + timer_wrapper.statistics = statistics + + return timer_wrapper + + return inner + diff --git a/tangostationcontrol/tangostationcontrol/devices/lofar_device.py b/tangostationcontrol/tangostationcontrol/devices/lofar_device.py index 6f0f1de56b5bd0ade282aeede42fd719ce375b13..3fa662c0198b6f93b478a9fc09dbee6fd207e064 100644 --- a/tangostationcontrol/tangostationcontrol/devices/lofar_device.py +++ b/tangostationcontrol/tangostationcontrol/devices/lofar_device.py @@ -17,6 +17,7 @@ from tango import AttrWriteType, DevState, DebugIt, Attribute, DeviceProxy, Attr import time import math import numpy +import textwrap # Additional import from tangostationcontrol.clients.attribute_wrapper import attribute_wrapper @@ -278,7 +279,7 @@ class lofar_device(Device, metaclass=DeviceMeta): default_value = getattr(self, f"{name}_default") # set the attribute to the configured default - logger.debug(f"Setting attribute {name} to {default_value}") + logger.debug(textwrap.shorten(f"Setting attribute {name} to {default_value}", 150)) self.proxy.write_attribute(name, default_value) except Exception as e: # log which attribute we're addressing diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/beamlet.py b/tangostationcontrol/tangostationcontrol/devices/sdp/beamlet.py index ddd3e7899e354c3442b38c0df27737e267db0952..585f514bc0cbc1df43321b10474575ab78b64edd 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/beamlet.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/beamlet.py @@ -8,19 +8,24 @@ """ # PyTango imports -from tango.server import device_property, command -from tango import AttrWriteType, DevVarFloatArray, DevVarULongArray +from tango.server import device_property, command, attribute +from tango import AttrWriteType, DevVarFloatArray, DevVarULongArray, DeviceProxy, DevSource, EventType # Additional import from tangostationcontrol.common.entrypoint import entry +from tangostationcontrol.common.lofar_logging import log_exceptions from tangostationcontrol.clients.attribute_wrapper import attribute_wrapper from tangostationcontrol.devices.opcua_device import opcua_device from tangostationcontrol.devices.sdp.sdp import SDP import numpy +from functools import lru_cache __all__ = ["Beamlet", "main"] +import logging +logger = logging.getLogger() + class Beamlet(opcua_device): @@ -70,6 +75,12 @@ class Beamlet(opcua_device): default_value = [[0] * A_pn * N_pol * N_beamlets_ctrl] * N_pn ) + subband_select_RW_default = device_property( + dtype='DevVarULongArray', + mandatory=False, + default_value = [102] * N_beamlets_ctrl + ) + first_default_settings = [ 'FPGA_beamlet_output_hdr_eth_destination_mac_RW', 'FPGA_beamlet_output_hdr_ip_destination_address_RW', @@ -101,8 +112,8 @@ class Beamlet(opcua_device): # Select subband per dual-polarisation beamlet. # 0 for antenna polarization X in beamlet polarization X, # 1 for antenna polarization Y in beamlet polarization Y. - FPGA_beamlet_subband_select_R = attribute_wrapper(comms_annotation=["FPGA_beamlet_subband_select_R"], datatype=numpy.uint16, dims=(A_pn * N_pol * N_beamlets_ctrl, N_pn)) - FPGA_beamlet_subband_select_RW = attribute_wrapper(comms_annotation=["FPGA_beamlet_subband_select_RW"], datatype=numpy.uint16, dims=(A_pn * N_pol * N_beamlets_ctrl, N_pn), access=AttrWriteType.READ_WRITE) + FPGA_beamlet_subband_select_R = attribute_wrapper(comms_annotation=["FPGA_beamlet_subband_select_R"], datatype=numpy.uint32, dims=(A_pn * N_pol * N_beamlets_ctrl, N_pn)) + FPGA_beamlet_subband_select_RW = attribute_wrapper(comms_annotation=["FPGA_beamlet_subband_select_RW"], datatype=numpy.uint32, dims=(A_pn * N_pol * N_beamlets_ctrl, N_pn), access=AttrWriteType.READ_WRITE) # cint16[N_pn][A_pn][N_pol][N_beamlets_ctrl] # Co-polarization BF weights. The N_pol = 2 parameter index is: @@ -134,6 +145,20 @@ class Beamlet(opcua_device): FPGA_bf_weights_yy_R = attribute_wrapper(comms_annotation=["FPGA_bf_weights_yy_R"], datatype=numpy.uint32, dims=(A_pn * N_beamlets_ctrl, N_pn)) FPGA_bf_weights_yy_RW = attribute_wrapper(comms_annotation=["FPGA_bf_weights_yy_RW"], datatype=numpy.uint32, dims=(A_pn * N_beamlets_ctrl, N_pn), access=AttrWriteType.READ_WRITE) + subband_select_RW = attribute(dtype=(numpy.uint32,), max_dim_x=N_beamlets_ctrl, access=AttrWriteType.READ_WRITE, fisallowed="is_attribute_Wrapper_allowed") + + def read_subband_select_RW(self): + return self._subband_select + + def write_subband_select_RW(self, subbands): + # Use the same subband for all inputs and polarisations of a beamlet + self.proxy.FPGA_beamlet_subband_select_RW = numpy.tile(subbands, (self.N_pn, self.A_pn * self.N_pol)) + + self.cache_clear() + + # Store new value + self._subband_select = subbands + # ---------- # Summarising Attributes # ---------- @@ -142,33 +167,149 @@ class Beamlet(opcua_device): # Overloaded functions # -------- + def configure_for_initialise(self): + super().configure_for_initialise() + + self._subband_select = numpy.zeros(self.N_beamlets_ctrl, dtype=numpy.uint32) + + self.sdp_proxy = DeviceProxy("STAT/SDP/1") + self.sdp_proxy.set_source(DevSource.DEV) + + # subscribe to events to notice setting changes in SDP that determine the input frequency + self.event_subscriptions = {} + self.event_subscriptions["clock_rw"] = self.sdp_proxy.subscribe_event("clock_RW", EventType.CHANGE_EVENT, self._frequency_change_event, stateless=True) + self.event_subscriptions["nyquist_zone_r"] = self.sdp_proxy.subscribe_event("nyquist_zone_R", EventType.CHANGE_EVENT, self._frequency_change_event, stateless=True) + + def configure_for_off(self): + super().configure_for_off() + + # unsubscribe from all events + for k,v in list(self.event_subscriptions.items()): + self.sdp_proxy.unsubscribe_event(v) + del self.event_subscriptions[k] + # -------- # internal functions # -------- - def _calculate_bf_weights(self, phases: numpy.ndarray): - """ Helper function that converts a difference in phase (in radiants) - to a FPGA weight (in complex number) """ - + + @log_exceptions() + def _frequency_change_event(self, event): + """ Trigger on external changes in frequency settings. """ + + if event.err: + # little we can do here. note that errors are also thrown if the device we subscribed to is offline + return + + logger.info(f"Received attribute change event from {event.device}: {event.attr_value.name} := {event.attr_value.value}") + + # invalidate caches for frequency-dependent values + self.cache_clear() + + @command() + def cache_clear(self): + """ Explicitly clear any caches. """ + + self._beamlet_frequencies.cache_clear() + + """ + The SDP FPGAs correct for signal-delay differences by rotating the phases of the antenna signals. A delay + is converted to a phase difference as follows: + + phase = 2 * pi * frequency * delay + + where 'frequency' is the subband frequency: + + LBA: frequency = (subband_nr + 0) * clock / 1024 + HBA: frequency = (subband_nr + 512) * clock / 1024 + + The beamformer combines a set of antennas for each beamlet, and each beamlet can have a different pointing + and subband selected. + + This results in an array of phase[antenna][beamlet] adjustments to be sent to the FPGA.The FPGA accepts + weights as a 16-bit (imag,real) complex pair packed into an uint32. + + The phases, delays, and final beam weights, all have shape (fpga_nr, [input_nr][pol][beamlet_nr]). + """ + + BF_UNIT_WEIGHT = 2**14 + + @staticmethod + def _phases_to_bf_weights(phases: numpy.ndarray): + """ Convert differences in phase (in radians) into FPGA weights (complex numbers packed into uint32). """ + + # flatten array and restore its shape later, which makes running over all elements a lot easier + orig_shape = phases.shape + phases = phases.flatten() + # Convert array values in complex numbers - unit = numpy.power(2,14) - real = numpy.array(unit * numpy.cos(phases), dtype=numpy.short) - imag = numpy.array(unit * numpy.sin(phases), dtype=numpy.short) - # join 16 bits of imaginary part (MSB) with 16 bits of real part (LSB) - bf_weights = numpy.array( numpy.frombuffer( b''.join([imag,real]), dtype=numpy.uint32 ) ) + real = Beamlet.BF_UNIT_WEIGHT * numpy.cos(phases) + imag = Beamlet.BF_UNIT_WEIGHT * numpy.sin(phases) + + # Interleave into (real, imag) pairs, and store as int16 + # see also https://stackoverflow.com/questions/5347065/interweaving-two-numpy-arrays/5347492 + # Round to nearest integer instead of rounding down + real_imag = numpy.empty(phases.size * 2, dtype=numpy.int16) + real_imag[0::2] = numpy.round(real) + real_imag[1::2] = numpy.round(imag) + + # Cast each (real, imag) pair into an uint32, which brings the array size + # back to the original. + bf_weights = real_imag.view(numpy.uint32) + + return bf_weights.reshape(orig_shape) + + @staticmethod + def _subband_frequencies(subbands: numpy.ndarray, clock: int, nyquist_zone: int) -> numpy.ndarray: + """ Obtain the frequencies of each subband, given a clock and an antenna type. """ + + subband_width = clock / 1024 + base_subband = nyquist_zone * 512 + + # broadcast clock across frequencies + frequencies = (subbands + base_subband) * subband_width + + return frequencies + + @lru_cache() # this function requires large hardware reads for values that don't change often + def _beamlet_frequencies(self): + """ Obtain the frequencies (in Hz) of each subband that is selected for each input and beamlet. + + Returns shape (fpga_nr, [input_nr][pol][beamlet_nr]). + """ + + # obtain which subband is selected for each input and beamlet + beamlet_subbands = self.read_attribute("FPGA_beamlet_subband_select_RW") # (fpga_nr, [input_nr][pol][beamlet_nr]) + return self._subband_frequencies(beamlet_subbands, self.sdp_proxy.clock_RW, self.sdp_proxy.nyquist_zone_R) + + @staticmethod + def _calculate_bf_weights(delays: numpy.ndarray, beamlet_frequencies: numpy.ndarray): + """ Helper function that converts a series of delays into FPGA_bf_weights_xx_yy. + All input and output arrays have the same dimensionality. + """ + + # compute the phases + beamlet_phases = (2.0 * numpy.pi) * beamlet_frequencies * delays + + # convert to weights + bf_weights = Beamlet._phases_to_bf_weights(beamlet_phases) + return bf_weights - + # -------- # Commands # -------- + @command(dtype_in=DevVarFloatArray, dtype_out=DevVarULongArray) - def calculate_bf_weights(self, phases: numpy.ndarray): - """ converts a difference in phase (in radiants) to a FPGA weight (in complex number) """ - + def calculate_bf_weights(self, delays: numpy.ndarray): + """ converts a difference in delays (in seconds) to a FPGA weight (in complex number) """ + # Calculate the FPGA weight array - bf_weights = self._calculate_bf_weights(phases) - - return bf_weights + delays = delays.reshape(self.N_pn, self.A_pn * self.N_pol * self.N_beamlets_ctrl) + beamlet_frequencies = self._beamlet_frequencies() + bf_weights = self._calculate_bf_weights(delays, beamlet_frequencies) + + return bf_weights.flatten() # ---------- # Run server diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py b/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py index 256bd5b2cccd8531b2e9ff9aeedcc1acf25cd425..9eb7533375f669acbcf652a5352deb47fdc5653e 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py @@ -8,29 +8,56 @@ """ # PyTango imports -#from tango.server import device_property -#from tango import AttrWriteType -# Additional import +from tango import Util, DeviceProxy, DevSource +from tango.server import attribute, AttrWriteType, device_property +# Additional import from tangostationcontrol.common.entrypoint import entry -#from tangostationcontrol.clients.attribute_wrapper import attribute_wrapper from tangostationcontrol.devices.beam_device import beam_device +from tangostationcontrol.devices.sdp.beamlet import Beamlet +from tangostationcontrol.devices.device_decorators import TimeIt +from tangostationcontrol.common.lofar_logging import log_exceptions +from tangostationcontrol.beam.delays import delay_calculator + +import numpy +import datetime -#import numpy +import logging +logger = logging.getLogger() __all__ = ["DigitalBeam", "main"] class DigitalBeam(beam_device): - pass + NUM_ANTENNAS = 96 + NUM_BEAMLETS = 488 + # ----------------- # Device Properties # ----------------- + antenna_select_RW_default = device_property( + dtype='DevVarBooleanArray', + mandatory=False, + default_value = [[True] * NUM_BEAMLETS] * NUM_ANTENNAS + ) + # ---------- # Attributes # ---------- + Duration_delays_R = attribute(access=AttrWriteType.READ, + dtype=numpy.float, fget=lambda self: self._delays.statistics["last"] or 0) + + antenna_select_RW = attribute(doc='Selection of antennas to use for forming each beamlet', + dtype=((numpy.bool_,),), max_dim_x=NUM_BEAMLETS, max_dim_y=NUM_ANTENNAS, access=AttrWriteType.READ_WRITE, fisallowed="is_attribute_Wrapper_allowed") + + def read_antenna_select_RW(self): + return self._antenna_select + + def write_antenna_select_RW(self, antennas): + self._antenna_select = antennas + # ---------- # Summarising Attributes # ---------- @@ -39,6 +66,107 @@ class DigitalBeam(beam_device): # Overloaded functions # -------- + @log_exceptions() + def configure_for_initialise(self): + super().configure_for_initialise(self.NUM_BEAMLETS) + + # Set a reference of RECV device that is correlated to this BEAM device + util = Util.instance() + instance_number = self.get_name().split('/')[2] + + self.recv_proxy = DeviceProxy( + f"{util.get_ds_inst_name()}/RECV/{instance_number}") + self.recv_proxy.set_source(DevSource.DEV) + + self.beamlet_proxy = DeviceProxy( + f"{util.get_ds_inst_name()}/Beamlet/{instance_number}") + self.beamlet_proxy.set_source(DevSource.DEV) + + # Retrieve positions from RECV device + reference_itrf = self.recv_proxy.Antenna_Field_Reference_ITRF_R + antenna_itrf = self.recv_proxy.HBAT_reference_ITRF_R.reshape(self.NUM_ANTENNAS, 3) + + # a delay calculator + self.delay_calculator = delay_calculator(reference_itrf) + + # absolute positions of each antenna element + self.antenna_positions = antenna_itrf + + # use all antennas unless specified otherwise + self._antenna_select = numpy.ones((self.NUM_ANTENNAS, self.NUM_BEAMLETS), dtype=numpy.bool_) + + # -------- + # internal functions + # -------- + + @TimeIt() + def _delays(self, pointing_direction: numpy.array, timestamp: datetime.datetime): + """ + Calculate the delays (in seconds) based on the pointing list and the timestamp, + + Returns delays[antenna][beamlet] + """ + + delays = numpy.zeros((self.NUM_ANTENNAS, self.NUM_BEAMLETS), dtype=numpy.float64) + + d = self.delay_calculator + d.set_measure_time(timestamp) + + for beamlet in range(self.NUM_BEAMLETS): + # calculate the delays based on the set reference position, the set time and now the set direction and antenna positions + delays[:, beamlet] = d.convert(pointing_direction[beamlet], self.antenna_positions) + + return delays + + def _map_antennas_on_fpga_inputs(self, arr): + """ Converts an array with dimensions [antenna][beamlet] -> [fpga_nr][input_nr][pol_nr][beamlet] + by repeating the values for both polarisations. """ + + assert arr.shape == (self.NUM_ANTENNAS, self.NUM_BEAMLETS) + + # Each antenna maps on [fpga_nr][input_nr][0] and [fpga_nr][input_nr][1], and we work + # with [antenna][beamlet], so we have to interleave copies of the input array per beamlet + + # double the delays to cover both polarisations + # [[1,2,3], [4,5,6]] -> [[1,2,3,1,2,3], [4,5,6,4,5,6]] + result = numpy.hstack((arr,arr)) + + # move doubling of last dimension into first + # [[1,2,3,1,2,3], [4,5,6,4,5,6]] -> [[1,2,3], [1,2,3], [4,5,6], [4,5,6]] + result = result.reshape(result.shape[0] * 2, result.shape[1] // 2) + + # cast into (fpga_nr, [input_nr][pol_nr][beamlet]) + result = result.reshape((Beamlet.N_pn, Beamlet.A_pn * Beamlet.N_pol * Beamlet.N_beamlets_ctrl)) + + return result + + def _set_pointing(self, pointing_direction: numpy.array, timestamp: datetime.datetime): + """ + Uploads beam weights based on a given pointing direction 2D array (96 tiles x 3 parameters) + """ + + # Retrieve delays from casacore + antenna_delays = self._delays(pointing_direction, timestamp) + + # Map them onto the FPGA inputs + fpga_delays = self._map_antennas_on_fpga_inputs(antenna_delays) + + beam_weights = self.beamlet_proxy.calculate_bf_weights(fpga_delays.flatten()) + beam_weights = beam_weights.reshape((Beamlet.N_pn, Beamlet.A_pn * Beamlet.N_pol * Beamlet.N_beamlets_ctrl)) + + # Filter out unwanted antennas + beam_weights *= self._map_antennas_on_fpga_inputs(self._antenna_select) + + # Write weights to SDP + self.beamlet_proxy.FPGA_bf_weights_xx_yy_RW = beam_weights + + # Record where we now point to, now that we've updated the weights. + # Only record pointings per beamlet, not which antennas took part + self._pointing_direction_r = pointing_direction + self._pointing_timestamp_r[:] = timestamp.timestamp() + + logger.info("Pointing direction updated") + # -------- # Commands # -------- diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/sdp.py b/tangostationcontrol/tangostationcontrol/devices/sdp/sdp.py index 914c84441b2170a68cba9b0372639ea25bc76e4d..c5b2968d9715779907834de2d561fda82c842b90 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/sdp.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/sdp.py @@ -32,6 +32,13 @@ class SDP(opcua_device): # Device Properties # ----------------- + AntennaType = device_property( + doc='Antenna type (LBA or HBA) we control', + dtype='DevString', + mandatory=False, + default_value = "HBA" + ) + TR_fpga_mask_RW_default = device_property( dtype='DevVarBooleanArray', mandatory=False, @@ -89,6 +96,12 @@ class SDP(opcua_device): default_value=[[8192] * 12 * 512] * 16 ) + clock_RW_default = device_property( + dtype='DevULong', + mandatory=False, + default_value = 200 * 1000000 + ) + translator_default_settings = [ 'TR_fpga_mask_RW' ] @@ -173,6 +186,63 @@ class SDP(opcua_device): FPGA_bst_offload_bsn_R = attribute_wrapper(comms_annotation=["FPGA_bst_offload_bsn_R"], datatype=numpy.int64, dims=(N_pn, N_beamsets_ctrl)) + antenna_type_R = attribute(doc='Type of antenna (LBA or HBA) attached to the FPGAs', + dtype=numpy.str, fget=lambda self: self.AntennaType) + nyquist_zone_R = attribute(doc='Nyquist zone of the input frequencies', + dtype=numpy.uint32, fisallowed="is_attribute_wrapper_allowed", + polling_period=1000, abs_change=1) + clock_RW = attribute(doc='Configured sampling clock (Hz)', + dtype=numpy.uint32, access=AttrWriteType.READ_WRITE, fisallowed="is_attribute_wrapper_allowed", + polling_period=1000, abs_change=1) + + def _nyquist_zone(self, clock): + """ Return the Nyquist zone for the given clock (in Hz). + + The Nyquist zone determines the frequency offset of + the antennas. + + NOTE: Only 160 and 200 MHz clocks are supported. """ + + # (AntennaType, clockMHz) -> Nyquist zone + nyquist_zones = { + ("LBA", 160): 0, + ("LBA", 200): 0, + ("HBA", 160): 1, + ("HBA", 200): 2, + } + + try: + return nyquist_zones[(self.AntennaType), clock // 1000000] + except KeyError: + raise ValueError(f"Could not determine Nyquist zone for antenna type {self.AntennaType} with clock {clock} Hz") + + def read_nyquist_zone_R(self): + try: + return self._nyquist_zone(self.read_attribute("clock_RW")) + except ValueError: + # Supply a sane default for computations in tests until L2SDP-725 allows us to read back the set clock + return 0 + + def read_clock_RW(self): + # We can only return a single value, so we assume the FPGA is configured coherently. Which is something + # that is to be checked by an independent monitoring system anyway. + mask = self.read_attribute("TR_fpga_mask_RW") + clocks = self.read_attribute("FPGA_pps_expected_cnt_RW") + clocks_in_mask = [clock for idx,clock in enumerate(clocks) if mask[idx]] + + # We return first setting within the mask. If there are no FPGAs selected at all, just return a sane default. + return numpy.uint32(clocks_in_mask[0]) if clocks_in_mask else self.clock_RW_default + + def write_clock_RW(self, clock): + if clock not in (160*1000000, 200*1000000): + raise ValueError(f"Unsupported clock frequency: {clock}") + + # Tell all FPGAs to use this clock + self.proxy.FPGA_pps_expected_cnt_RW = [clock] * self.N_pn + + # Also update the packet headers + self.proxy.FPGA_sdp_info_nyquist_sampling_zone_index_RW = [self._nyquist_zone(clock)] * self.N_pn + # ---------- # Summarising Attributes # ---------- diff --git a/tangostationcontrol/tangostationcontrol/devices/tilebeam.py b/tangostationcontrol/tangostationcontrol/devices/tilebeam.py index 0c8eb401f9fd6be41677d0295e9463c6d1f8ebe9..ae86554f71871a5c7dd3774393677f27cf856eff 100644 --- a/tangostationcontrol/tangostationcontrol/devices/tilebeam.py +++ b/tangostationcontrol/tangostationcontrol/devices/tilebeam.py @@ -9,100 +9,41 @@ import numpy import datetime -from json import loads -from threading import Thread, Lock, Condition -from tango import AttrWriteType, DebugIt, DeviceProxy, DevVarStringArray, DevVarDoubleArray, DevString, DevSource -from tango.server import attribute, command, device_property +from tango import DeviceProxy, DevSource from tango import Util # Additional import from tangostationcontrol.common.entrypoint import entry from tangostationcontrol.common.lofar_logging import device_logging_to_python, log_exceptions from tangostationcontrol.beam.delays import delay_calculator -from tangostationcontrol.devices.device_decorators import only_in_states, fault_on_error from tangostationcontrol.devices.beam_device import beam_device import logging logger = logging.getLogger() - -__all__ = ["TileBeam", "main", "BeamTracker"] +__all__ = ["TileBeam", "main"] @device_logging_to_python() class TileBeam(beam_device): + NUM_TILES = 96 # ----------------- # Device Properties # ----------------- - Beam_tracking_interval = device_property( - dtype='DevFloat', - doc='Beam weights updating interval time [seconds]', - mandatory=False, - default_value = 10.0 - ) - - Beam_tracking_preparation_period = device_property( - dtype='DevFloat', - doc='Preparation time [seconds] needed before starting update operation', - mandatory=False, - default_value = 0.25 - ) - # ---------- # Attributes # ---------- - Pointing_direction_R = attribute(access=AttrWriteType.READ, - dtype=((numpy.str,),), max_dim_x=3, max_dim_y=96, - fget=lambda self: self._pointing_direction_r) - - Pointing_direction_RW = attribute(access=AttrWriteType.READ_WRITE, - dtype=((numpy.str,),), max_dim_x=3, max_dim_y=96, - fget=lambda self: self._pointing_direction_rw) - - Pointing_direction_str_R = attribute(access=AttrWriteType.READ, - doc='Pointing direction as a formatted string', - dtype=(numpy.str,), max_dim_x=96, - fget=lambda self: ["{0} ({1}, {2})".format(*x) for x in self._pointing_direction_r]) - - Pointing_timestamp_R = attribute(access=AttrWriteType.READ, - dtype=(numpy.double,), max_dim_x=96, - fget=lambda self: self._pointing_timestamp_r) - - Tracking_enabled_R = attribute(access=AttrWriteType.READ, - doc="Whether the tile beam is updated periodically", - dtype=numpy.bool, - fget=lambda self: self.Beam_tracker.is_alive()) - - Tracking_enabled_RW = attribute(access=AttrWriteType.READ_WRITE, - doc="Whether the tile beam should be updated periodically", - dtype=numpy.bool, - fget=lambda self: self._tracking_enabled_rw) - # -------- # overloaded functions # -------- - def init_device(self): - super().init_device() - - # thread to perform beam tracking - self.Beam_tracker = None - @log_exceptions() def configure_for_initialise(self): - super().configure_for_initialise() - - # Initialise pointing array data and attribute - self._pointing_timestamp_r = numpy.zeros(96, dtype=numpy.double) - self._pointing_direction_r = numpy.zeros((96,3), dtype="<U32") - self._pointing_direction_rw = numpy.array([["AZELGEO","0deg","90deg"]] * 96, dtype="<U32") - - # Initialise tracking control - self._tracking_enabled_rw = True + super().configure_for_initialise(self.NUM_TILES) # Set a reference of RECV device that is correlated to this BEAM device util = Util.instance() @@ -113,71 +54,28 @@ class TileBeam(beam_device): # Retrieve positions from RECV device HBAT_reference_itrf = self.recv_proxy.HBAT_reference_itrf_R - HBAT_antenna_itrf_offsets = self.recv_proxy.HBAT_antenna_itrf_offsets_R.reshape(96,16,3) + HBAT_antenna_itrf_offsets = self.recv_proxy.HBAT_antenna_itrf_offsets_R.reshape(self.NUM_TILES, 16, 3) # a delay calculator for each tile self.HBAT_delay_calculators = [delay_calculator(reference_itrf) for reference_itrf in HBAT_reference_itrf] # absolute positions of each antenna element - self.HBAT_antenna_positions = [HBAT_reference_itrf[tile] + HBAT_antenna_itrf_offsets[tile] for tile in range(96)] - - # Create a thread object to update beam weights - self.Beam_tracker = BeamTracker(self) - - @log_exceptions() - def configure_for_on(self): - super().configure_for_on() - - # Start beam tracking thread - if self._tracking_enabled_rw: - self.Beam_tracker.start() - - @log_exceptions() - def configure_for_off(self): - if self.Beam_tracker: - # Stop thread object - self.Beam_tracker.stop() - - super().configure_for_off() + self.HBAT_antenna_positions = [HBAT_reference_itrf[tile] + HBAT_antenna_itrf_offsets[tile] for tile in range(self.NUM_TILES)] # -------- # internal functions # -------- - def write_Pointing_direction_RW(self, value): - """ Setter method for attribute Pointing_direction_RW """ - # verify whether values are valid - for tile in range(96): - if not self.HBAT_delay_calculators[tile].is_valid_direction(value[tile]): - raise ValueError(f"Invalid direction: {value[tile]}") - - self._pointing_direction_rw = value - - # force update across tiles if pointing changes - self.Beam_tracker.force_update() - logger.info("Pointing direction update requested") - - def write_Tracking_enabled_RW(self, value): - self._tracking_enabled_rw = value - - if value: - self.Beam_tracker.start() - else: - self.Beam_tracker.stop() - - def _delays(self, pointing_direction: numpy.array, timestamp: datetime.datetime = None): - """ - Calculate the delays (in seconds) based on the pointing list and the timestamp + def _delays(self, pointing_direction: numpy.array, timestamp: datetime.datetime): """ + Calculate the delays (in seconds) based on the pointing list and the timestamp. - if not timestamp: - "B008 Do not perform function calls in argument defaults. The call" - "is performed only once at function definition time." - timestamp = datetime.datetime.now() + Returns delays[tile][element] + """ - delays = numpy.zeros((96,16), dtype=numpy.float64) + delays = numpy.zeros((self.NUM_TILES, 16), dtype=numpy.float64) - for tile in range(96): + for tile in range(self.NUM_TILES): # initialise delay calculator d = self.HBAT_delay_calculators[tile] d.set_measure_time(timestamp) @@ -187,23 +85,18 @@ class TileBeam(beam_device): return delays - def _set_pointing(self, pointing_direction: numpy.array, timestamp: datetime.datetime = None): + def _set_pointing(self, pointing_direction: numpy.array, timestamp: datetime.datetime): """ Uploads beam weights based on a given pointing direction 2D array (96 tiles x 3 parameters) """ - if not timestamp: - "B008 Do not perform function calls in argument defaults. The call" - "is performed only once at function definition time." - timestamp = datetime.datetime.now() - # Retrieve delays from casacore delays = self._delays(pointing_direction, timestamp) # Convert delays into beam weights delays = delays.flatten() HBAT_bf_delay_steps = self.recv_proxy.calculate_HBAT_bf_delay_steps(delays) - HBAT_bf_delay_steps = numpy.array(HBAT_bf_delay_steps, dtype=numpy.int64).reshape(96,32) + HBAT_bf_delay_steps = numpy.array(HBAT_bf_delay_steps, dtype=numpy.int64).reshape(self.NUM_TILES, 32) # Write weights to RECV self.recv_proxy.HBAT_BF_delay_steps_RW = HBAT_bf_delay_steps @@ -211,7 +104,7 @@ class TileBeam(beam_device): # Record where we now point to, now that we've updated the weights. # Only the entries within the mask have been updated mask = self.recv_proxy.ANT_mask_RW.flatten() - for rcu in range(96): + for rcu in range(self.NUM_TILES): if mask[rcu]: self._pointing_direction_r[rcu] = pointing_direction[rcu] self._pointing_timestamp_r[rcu] = timestamp.timestamp() @@ -221,174 +114,9 @@ class TileBeam(beam_device): # Commands # -------- - @command(dtype_in=DevVarStringArray, dtype_out=DevVarDoubleArray) - @DebugIt() - @log_exceptions() - @only_in_states(beam_device.DEFAULT_COMMAND_STATES) - def delays(self, pointing_direction: numpy.array, timestamp: datetime.datetime = None): - """ - Calculate the delays (in seconds) based on the pointing list and the timestamp - TBD: antenna and reference positions will be retrieved from RECV and not stored as BEAM device properties - """ - - if not timestamp: - "B008 Do not perform function calls in argument defaults. The call" - "is performed only once at function definition time." - timestamp = datetime.datetime.now() - - pointing_direction = numpy.array(pointing_direction).reshape(96,3) - - delays = self._delays(pointing_direction, timestamp) - - return delays.flatten() - - @command(dtype_in=DevVarStringArray) - @DebugIt() - @log_exceptions() - @only_in_states(beam_device.DEFAULT_COMMAND_STATES) - def set_pointing(self, pointing_direction: list, timestamp: datetime.datetime = None): - """ - Uploads beam weights based on a given pointing direction 2D array (96 tiles x 3 parameters) - """ - - if not timestamp: - "B008 Do not perform function calls in argument defaults. The call" - "is performed only once at function definition time." - timestamp = datetime.datetime.now() - - # Reshape the flatten input array - pointing_direction = numpy.array(pointing_direction).reshape(96,3) - - self._set_pointing(pointing_direction, timestamp) - - @command(dtype_in = DevString) - @DebugIt() - @only_in_states(beam_device.DEFAULT_COMMAND_STATES) - def set_pointing_for_specific_time(self, parameters: DevString = None): - """ - Uploads beam weights based on a given pointing direction 2D array (96 tiles x 3 parameters) - for the given timestamp - - Parameters are given in a json document. - pointing_direction: array of 96 casacore pointings (3 string parameters) - timestamp: POSIX timestamp - """ - pointing_parameters = loads(parameters) - - pointing_direction = list(pointing_parameters["pointing_direction"]) - timestamp_param = float(pointing_parameters["timestamp"]) - timestamp = datetime.datetime.fromtimestamp(timestamp_param) - - # Reshape the flatten pointing array - pointing_direction = numpy.array(pointing_direction).reshape(96,3) - - self._set_pointing(pointing_direction, timestamp) - - # ---------- # Run server # ---------- def main(**kwargs): """Main function of the ObservationControl module.""" return entry(TileBeam, **kwargs) - -# ---------- -# Beam Tracker -# ---------- -class BeamTracker(): - - DISCONNECT_TIMEOUT = 3.0 - - """ Object that encapsulates a Thread, resposible for beam tracking operations """ - def __init__(self, device: beam_device): - self.thread = None - self.device = device - - # Condition to trigger a forced update or early abort - self.update_lock = Lock() - self.update_condition = Condition(self.update_lock) - - # Whether the pointing has to be forced updated - self.stale_pointing = True - - def start(self): - """ Starts the Beam Tracking thread """ - if self.thread: - # already started - return - - self.done = False - self.thread = Thread(target=self._update_pointing_direction, name=f"BeamTracker of {self.device.get_name()}") - self.thread.start() - - logger.info("BeamTracking thread started") - - def is_alive(self): - """ Returns True just before the Thread run() method starts until just after the Thread run() method terminates. """ - return self.thread and self.thread.is_alive() - - def force_update(self): - """ Force the pointing to be updated. """ - - self.stale_pointing = True - self.notify_thread() - - def notify_thread(self): - # inform the thread to stop waiting - with self.update_lock: - self.update_condition.notify() - - def stop(self): - """ Stops the Beam Tracking loop """ - - if not self.thread: - return - - logger.info("BeamTracking thread stopping") - - self.done = True - self.force_update() - - # wait for thread to finish - self.thread.join(self.DISCONNECT_TIMEOUT) - - if self.is_alive(): - logger.error("BeamTracking Thread did not properly terminate") - - self.thread = None - - logger.info("BeamTracking thread stopped") - - def _get_sleep_time(self): - """ Computes the sleep time (in seconds) that needs to be waited for the next beam tracking update """ - now = datetime.datetime.now().timestamp() - - # Computes the left seconds before the next update - next_update_in = self.device.Beam_tracking_interval - (now % self.device.Beam_tracking_interval) - - # Computes the needed sleep time before the next update - sleep_time = next_update_in - self.device.Beam_tracking_preparation_period - # If sleep time is negative, add the tracking interval for the next update - if sleep_time < 0: - return sleep_time + self.device.Beam_tracking_interval - else: - return sleep_time - - # @fault_on_error routes errors here. we forward them to our device - def Fault(self, msg): - self.device.Fault(msg) - - @log_exceptions() - @fault_on_error() - def _update_pointing_direction(self): - """ Updates the beam weights using a fixed interval of time """ - - # Check if flag beamtracking is true - with self.update_lock: - while not self.done: - self.stale_pointing = False - self.device._set_pointing(self.device._pointing_direction_rw, datetime.datetime.now()) - - # sleep until the next update, or when interrupted (this releases the lock, allowing for notification) - # note that we need wait_for as conditions can be triggered multiple times in succession - self.update_condition.wait_for(lambda: self.done or self.stale_pointing, self._get_sleep_time()) diff --git a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_beamlet.py b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_beamlet.py index dcabcafc1f5e6f8dead16ffc4f913068b4309943..d76ee10e4f15664f176175a34c031c3008872b61 100644 --- a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_beamlet.py +++ b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_beamlet.py @@ -11,6 +11,11 @@ from .base import AbstractTestBases from tangostationcontrol.integration_test.device_proxy import TestDeviceProxy +import numpy +import numpy.testing +import time +from ctypes import c_short + class TestDeviceBeamlet(AbstractTestBases.TestDeviceBase): def setUp(self): @@ -30,3 +35,55 @@ class TestDeviceBeamlet(AbstractTestBases.TestDeviceBase): sdp_proxy.warm_boot() sdp_proxy.set_defaults() return sdp_proxy + + def test_pointing_to_zenith(self): + # Setup configuration + sdp_proxy = self.setup_sdp() + + self.proxy.initialise() + self.proxy.subband_select = [0] * 488 # no other value will be read back from SDP yet, see L2SDP-725 + self.proxy.on() + + # The subband frequency of HBA subband 0 is 200 MHz, + # so its period is 5 ns. A delay of 2.5e-9 seconds is thus half a period, + # and result in a 180 degree phase offset. + delays = numpy.array([[2.5e-9] * 192] * 488) + + calculated_bf_weights = self.proxy.calculate_bf_weights(delays.flatten()) + + # With a unit weight of 2**14, we thus expect beamformer weights of -2**14 + 0j, + # which is 49152 when read as an uint32. + self.assertEqual(-2**14, c_short(49152).value) # check our calculations + expected_bf_weights = numpy.array([49152] * 192 * 488, dtype=numpy.uint32) + + # disabled until L2SDP-725 is fixed, as the read back clock cannot be anything else than 0 + #numpy.testing.assert_almost_equal(expected_bf_weights, calculated_bf_weights) + + def test_sdp_clock_change(self): + # Setup configuration + sdp_proxy = self.setup_sdp() + + self.proxy.initialise() + self.proxy.subband_select = [0] * 488 # no other value will be read back from SDP yet, see L2SDP-725 + self.proxy.on() + + # any non-zero delay should result in different weights for different clocks + delays = numpy.array([[2.5e-9] * 192] * 488) + + sdp_proxy.clock_RW = 200 * 1000000 + time.sleep(1) # wait for beamlet device to process change event + calculated_bf_weights_200 = self.proxy.calculate_bf_weights(delays.flatten()) + + sdp_proxy.clock_RW = 160 * 1000000 + time.sleep(1) # wait for beamlet device to process change event + calculated_bf_weights_160 = self.proxy.calculate_bf_weights(delays.flatten()) + + sdp_proxy.clock_RW = 200 * 1000000 + time.sleep(1) # wait for beamlet device to process change event + calculated_bf_weights_200_v2 = self.proxy.calculate_bf_weights(delays.flatten()) + + # outcome should be changed back and forth across clock changes + # disabled until L2SDP-725 is fixed, as the read back clock cannot be anything else than 0 + #numpy.testing.assert_((calculated_bf_weights_200 != calculated_bf_weights_160).all()) + #numpy.testing.assert_((calculated_bf_weights_200 == calculated_bf_weights_200_v2).all()) + diff --git a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_digitalbeam.py b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_digitalbeam.py index c023f256b1bdcb8cf5a935053fcba493fe2546dd..3a3fa57eff087ac3a04650731ab2ef3700104f8a 100644 --- a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_digitalbeam.py +++ b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_digitalbeam.py @@ -7,10 +7,47 @@ # # Distributed under the terms of the APACHE license. # See LICENSE.txt for more info. + +from tangostationcontrol.integration_test.device_proxy import TestDeviceProxy + from .base import AbstractTestBases +import numpy + class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): def setUp(self): """Intentionally recreate the device object in each test""" super().setUp("STAT/DigitalBeam/1") + + self.recv_proxy = self.setup_recv_proxy() + self.beamlet_proxy = self.setup_beamlet_proxy() + + def setup_recv_proxy(self): + recv_proxy = TestDeviceProxy("STAT/RECV/1") + recv_proxy.off() + recv_proxy.warm_boot() + recv_proxy.set_defaults() + return recv_proxy + + def setup_beamlet_proxy(self): + beamlet_proxy = TestDeviceProxy("STAT/Beamlet/1") + beamlet_proxy.off() + beamlet_proxy.warm_boot() + beamlet_proxy.set_defaults() + return beamlet_proxy + + def test_pointing_to_zenith(self): + # Setup beamlet configuration + self.beamlet_proxy.clock_RW = 200 * 1000000 + self.beamlet_proxy.subband_select = list(range(488)) + + self.proxy.initialise() + self.proxy.Tracking_enabled_RW = False + self.proxy.on() + + # Point to Zenith + # + # Cannot test what was written to SDP due to L2SDP-725 + # so we just call the command, checking whether no exception is thrown + self.proxy.set_pointing(numpy.array([["AZELGEO","0deg","90deg"]] * 488).flatten()) diff --git a/tangostationcontrol/tangostationcontrol/test/devices/test_beamlet_device.py b/tangostationcontrol/tangostationcontrol/test/devices/test_beamlet_device.py new file mode 100644 index 0000000000000000000000000000000000000000..81b34f00e8135563b22af10bb72d53ea7fceab13 --- /dev/null +++ b/tangostationcontrol/tangostationcontrol/test/devices/test_beamlet_device.py @@ -0,0 +1,127 @@ +# -*- coding: utf-8 -*- +# +# This file is part of the LOFAR 2.0 Station Software +# +# +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. + +from tangostationcontrol.devices.sdp.beamlet import Beamlet + +import numpy +import numpy.testing +from ctypes import c_short + +from tangostationcontrol.test import base + +# unpack into 16-bit complex +def to_complex(uint32): + imag = c_short(uint32 >> 16).value + real = c_short(uint32 & 0xFFFF).value + unit = Beamlet.BF_UNIT_WEIGHT + + return (real + 1j * imag) / unit + +class TestBeamletDevice(base.TestCase): + def test_phases_to_bf_weights(self): + # offer nice 0, 90, 180, 270, 360 degrees + phases = numpy.array([0.0, numpy.pi / 2, numpy.pi, numpy.pi * 1.5, numpy.pi * 2]) + unit = Beamlet.BF_UNIT_WEIGHT + + bf_weights = Beamlet._phases_to_bf_weights(phases) + + # check whether the complex representation is also along the right axes and + # has the right amplitude + self.assertEqual(to_complex(bf_weights[0]), 1 + 0j, msg=f"bf_weights = {bf_weights}") + self.assertEqual(to_complex(bf_weights[1]), 0 + 1j, msg=f"bf_weights = {bf_weights}") + self.assertEqual(to_complex(bf_weights[2]), -1 + 0j, msg=f"bf_weights = {bf_weights}") + self.assertEqual(to_complex(bf_weights[3]), 0 - 1j, msg=f"bf_weights = {bf_weights}") + self.assertEqual(to_complex(bf_weights[4]), 1 + 0j, msg=f"bf_weights = {bf_weights}") + + def test_calculate_bf_weights_small_numbers(self): + # 2 beamlets, 3 antennas. The antennas are 1 second apart. + delays = numpy.array([ + [1.0, 2.0, 3.0], + [1.0, 2.0, 3.0], + ]) + + # the frequency of the signal is 1.0 Hz and 0.5 Hz respectively, + # so the antennas will be either in phase or in opposite phase + beamlet_frequencies = numpy.array([ + [1.0, 1.0, 1.0], + [0.5, 0.5, 0.5], + ]) + + bf_weights = Beamlet._calculate_bf_weights(delays, beamlet_frequencies) + + self.assertEqual(delays.shape, bf_weights.shape) + + self.assertEqual(to_complex(bf_weights[0][0]), 1 + 0j, msg=f"bf_weights = {bf_weights}") + self.assertEqual(to_complex(bf_weights[0][1]), 1 + 0j, msg=f"bf_weights = {bf_weights}") + self.assertEqual(to_complex(bf_weights[0][2]), 1 + 0j, msg=f"bf_weights = {bf_weights}") + self.assertEqual(to_complex(bf_weights[1][0]), -1 + 0j, msg=f"bf_weights = {bf_weights}") + self.assertEqual(to_complex(bf_weights[1][1]), 1 + 0j, msg=f"bf_weights = {bf_weights}") + self.assertEqual(to_complex(bf_weights[1][2]), -1 + 0j, msg=f"bf_weights = {bf_weights}") + + def test_calculate_bf_weights_actual_numbers(self): + # 2 beamlets, 3 antennas. The antennas are 1 second apart. + delays = numpy.array([ + [0.0, 2.5e-9, 5.0e-9] + ]) + + # the frequency of the signal is 1.0 Hz and 0.5 Hz respectively, + # so the antennas will be either in phase or in opposite phase + beamlet_frequencies = numpy.array([ + [200e6, 200e6, 200e6] + ]) + + bf_weights = Beamlet._calculate_bf_weights(delays, beamlet_frequencies) + + self.assertEqual(delays.shape, bf_weights.shape) + + self.assertEqual(to_complex(bf_weights[0][0]), 1 + 0j, msg=f"bf_weights = {bf_weights}") + self.assertEqual(to_complex(bf_weights[0][1]), -1 + 0j, msg=f"bf_weights = {bf_weights}") + self.assertEqual(to_complex(bf_weights[0][2]), 1 + 0j, msg=f"bf_weights = {bf_weights}") + + def test_subband_frequencies(self): + subbands = numpy.array([ + [0, 1, 102], + ]) + + clocks = numpy.array([ + 200 * 1000000, + 160 * 1000000 + ]) + + subband_width = 200e6 / 1024 + + # for reference values, see https://proxy.lofar.eu/rtsm/tests/ + + lba_frequencies = Beamlet._subband_frequencies(subbands, 160 * 1000000, 0) + self.assertAlmostEqual(lba_frequencies[0][0], 0.0000000e6) + self.assertAlmostEqual(lba_frequencies[0][1], 0.1562500e6) + self.assertAlmostEqual(lba_frequencies[0][2], 15.9375000e6) + + lba_frequencies = Beamlet._subband_frequencies(subbands, 200 * 1000000, 0) + self.assertAlmostEqual(lba_frequencies[0][0], 0.0000000e6) + self.assertAlmostEqual(lba_frequencies[0][1], 0.1953125e6) + self.assertAlmostEqual(lba_frequencies[0][2], 19.9218750e6) + + # Nyquist zone 1 is not used in 160 MHz + + hba_low_frequencies = Beamlet._subband_frequencies(subbands, 200 * 1000000, 1) + self.assertAlmostEqual(hba_low_frequencies[0][0], 100.0000000e6) + self.assertAlmostEqual(hba_low_frequencies[0][1], 100.1953125e6) + self.assertAlmostEqual(hba_low_frequencies[0][2], 119.9218750e6) + + hba_high_frequencies = Beamlet._subband_frequencies(subbands, 160 * 1000000, 2) + self.assertAlmostEqual(hba_high_frequencies[0][0], 160.0000000e6) + self.assertAlmostEqual(hba_high_frequencies[0][1], 160.1562500e6) + self.assertAlmostEqual(hba_high_frequencies[0][2], 175.9375000e6) + + hba_high_frequencies = Beamlet._subband_frequencies(subbands, 200 * 1000000, 2) + self.assertAlmostEqual(hba_high_frequencies[0][0], 200.0000000e6) + self.assertAlmostEqual(hba_high_frequencies[0][1], 200.1953125e6) + self.assertAlmostEqual(hba_high_frequencies[0][2], 219.9218750e6) +