From 1e780c27f6fd766a1f8d9bc3b34323485ff3bdc0 Mon Sep 17 00:00:00 2001 From: stedif <stefano.difrischia@inaf.it> Date: Tue, 1 Feb 2022 16:55:38 +0100 Subject: [PATCH] L2SS-574: move method _calculate_HBAT_bf_delays to RECV --- .../tangostationcontrol/devices/beam.py | 28 +------------ .../tangostationcontrol/devices/recv.py | 40 ++++++++++++++++++- 2 files changed, 41 insertions(+), 27 deletions(-) diff --git a/tangostationcontrol/tangostationcontrol/devices/beam.py b/tangostationcontrol/tangostationcontrol/devices/beam.py index 6b3cff43d..2304a0782 100644 --- a/tangostationcontrol/tangostationcontrol/devices/beam.py +++ b/tangostationcontrol/tangostationcontrol/devices/beam.py @@ -9,8 +9,7 @@ import numpy import datetime -from functools import partial -from tango.server import attribute, command, device_property +from tango.server import attribute, command from tango import AttrWriteType, DebugIt, DevState, DeviceProxy, DevVarStringArray, DevVarDoubleArray # Additional import @@ -94,28 +93,6 @@ class Beam(lofar_device): return delays - @staticmethod - def _calculate_HBAT_bf_delays(delays: numpy.ndarray, HBAT_signal_input_delays: numpy.ndarray, HBAT_bf_delay_step_delays: numpy.ndarray): - """ - Helper function that converts a signal path delay (in seconds) to an analog beam weight, - which is a value per tile per dipole per polarisation. - """ - # Duplicate delay values per polarisation - polarised_delays = numpy.tile(delays, 2) # output dims -> 96x32 - - # Add signal input delay - calibrated_delays = numpy.add(polarised_delays, HBAT_signal_input_delays) - - # Find the right delay step by looking for the closest match in property RECV-> HBAT_bf_delay_step_delays - HBAT_bf_delays = numpy.zeros((96,32), dtype=numpy.int64) - distance = lambda x , y : numpy.absolute(x-y) - for tile in range(96): - for at in range(32): - delay = calibrated_delays[tile,at] - step = min(HBAT_bf_delay_step_delays,key=partial(distance,delay)) - HBAT_bf_delays[tile,at] = numpy.where(HBAT_bf_delay_step_delays==step)[0][0] - return HBAT_bf_delays - def _HBAT_set_pointing(self, pointing_direction: numpy.array, timestamp: datetime.datetime = datetime.datetime.now()): """ Uploads beam weights based on a given pointing direction 2D array (96 tiles x 3 parameters) @@ -124,8 +101,7 @@ class Beam(lofar_device): delays = self._HBAT_delays(pointing_direction, timestamp) # Convert delays into beam weights - HBAT_bf_delay_step_delays = self.recv_proxy.get_hbat_bf_delay_step_delays() - HBAT_bf_delays = self._calculate_HBAT_bf_delays(delays, self.HBAT_signal_input_delays, HBAT_bf_delay_step_delays) + HBAT_bf_delays = self.recv_proxy.calculate_HBAT_bf_delays(delays) # Write weights to RECV self.recv_proxy.HBAT_BF_delays_RW = HBAT_bf_delays diff --git a/tangostationcontrol/tangostationcontrol/devices/recv.py b/tangostationcontrol/tangostationcontrol/devices/recv.py index c6c2e979c..587f6dda5 100644 --- a/tangostationcontrol/tangostationcontrol/devices/recv.py +++ b/tangostationcontrol/tangostationcontrol/devices/recv.py @@ -12,7 +12,7 @@ """ # PyTango imports -from email.policy import default +from functools import partial from tango import DebugIt from tango.server import command from tango.server import device_property, attribute @@ -153,6 +153,30 @@ class RECV(opcua_device): # overloaded functions # -------- + # -------- + # internal functions + # -------- + def _calculate_HBAT_bf_delays(self, delays: numpy.ndarray): + """ + Helper function that converts a signal path delay (in seconds) to an analog beam weight, + which is a value per tile per dipole per polarisation. + """ + # Duplicate delay values per polarisation + polarised_delays = numpy.tile(delays, 2) # output dims -> 96x32 + + # Add signal input delay + calibrated_delays = numpy.add(polarised_delays, self.HBAT_signal_input_delays) + + # Find the right delay step by looking for the closest match in property RECV-> HBAT_bf_delay_step_delays + HBAT_bf_delays = numpy.zeros((96,32), dtype=numpy.int64) + distance = lambda x , y : numpy.absolute(x-y) + for tile in range(96): + for at in range(32): + delay = calibrated_delays[tile,at] + step = min(self.HBAT_bf_delay_step_delays,key=partial(distance,delay)) + HBAT_bf_delays[tile,at] = numpy.where(self.HBAT_bf_delay_step_delays==step)[0][0] + return HBAT_bf_delays + # -------- # Commands # -------- @@ -183,6 +207,20 @@ class RECV(opcua_device): def get_hbat_signal_input_delays(self): """ Return the property HBAT_signal_input_delays (96x32) into a flatten array """ return self.HBAT_signal_input_delays.flatten() + + @command(dtype_in=DevVarFloatArray, dtype_out=DevVarFloatArray) + @DebugIt() + @only_in_states([DevState.ON]) + def calculate_HBAT_bf_delays(self, delays: numpy.ndarray): + """ converts a signal path delay (in seconds) to an analog beam weight """ + + # Reshape the flatten input array + delays = numpy.array(delays).reshape(96,16) + + # Calculate the beam weight array + HBAT_bf_delays = self._calculate_HBAT_bf_delays(delays) + + return HBAT_bf_delays.flatten() @command() @DebugIt() -- GitLab