Skip to content
Snippets Groups Projects
Commit 1e780c27 authored by Stefano Di Frischia's avatar Stefano Di Frischia
Browse files

L2SS-574: move method _calculate_HBAT_bf_delays to RECV

parent c25c6d14
No related branches found
No related tags found
1 merge request!234Resolve L2SS-574 "Move hbat code to recv"
...@@ -9,8 +9,7 @@ ...@@ -9,8 +9,7 @@
import numpy import numpy
import datetime import datetime
from functools import partial from tango.server import attribute, command
from tango.server import attribute, command, device_property
from tango import AttrWriteType, DebugIt, DevState, DeviceProxy, DevVarStringArray, DevVarDoubleArray from tango import AttrWriteType, DebugIt, DevState, DeviceProxy, DevVarStringArray, DevVarDoubleArray
# Additional import # Additional import
...@@ -94,28 +93,6 @@ class Beam(lofar_device): ...@@ -94,28 +93,6 @@ class Beam(lofar_device):
return delays return delays
@staticmethod
def _calculate_HBAT_bf_delays(delays: numpy.ndarray, HBAT_signal_input_delays: numpy.ndarray, HBAT_bf_delay_step_delays: numpy.ndarray):
"""
Helper function that converts a signal path delay (in seconds) to an analog beam weight,
which is a value per tile per dipole per polarisation.
"""
# Duplicate delay values per polarisation
polarised_delays = numpy.tile(delays, 2) # output dims -> 96x32
# Add signal input delay
calibrated_delays = numpy.add(polarised_delays, HBAT_signal_input_delays)
# Find the right delay step by looking for the closest match in property RECV-> HBAT_bf_delay_step_delays
HBAT_bf_delays = numpy.zeros((96,32), dtype=numpy.int64)
distance = lambda x , y : numpy.absolute(x-y)
for tile in range(96):
for at in range(32):
delay = calibrated_delays[tile,at]
step = min(HBAT_bf_delay_step_delays,key=partial(distance,delay))
HBAT_bf_delays[tile,at] = numpy.where(HBAT_bf_delay_step_delays==step)[0][0]
return HBAT_bf_delays
def _HBAT_set_pointing(self, pointing_direction: numpy.array, timestamp: datetime.datetime = datetime.datetime.now()): def _HBAT_set_pointing(self, pointing_direction: numpy.array, timestamp: datetime.datetime = datetime.datetime.now()):
""" """
Uploads beam weights based on a given pointing direction 2D array (96 tiles x 3 parameters) Uploads beam weights based on a given pointing direction 2D array (96 tiles x 3 parameters)
...@@ -124,8 +101,7 @@ class Beam(lofar_device): ...@@ -124,8 +101,7 @@ class Beam(lofar_device):
delays = self._HBAT_delays(pointing_direction, timestamp) delays = self._HBAT_delays(pointing_direction, timestamp)
# Convert delays into beam weights # Convert delays into beam weights
HBAT_bf_delay_step_delays = self.recv_proxy.get_hbat_bf_delay_step_delays() HBAT_bf_delays = self.recv_proxy.calculate_HBAT_bf_delays(delays)
HBAT_bf_delays = self._calculate_HBAT_bf_delays(delays, self.HBAT_signal_input_delays, HBAT_bf_delay_step_delays)
# Write weights to RECV # Write weights to RECV
self.recv_proxy.HBAT_BF_delays_RW = HBAT_bf_delays self.recv_proxy.HBAT_BF_delays_RW = HBAT_bf_delays
......
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
""" """
# PyTango imports # PyTango imports
from email.policy import default from functools import partial
from tango import DebugIt from tango import DebugIt
from tango.server import command from tango.server import command
from tango.server import device_property, attribute from tango.server import device_property, attribute
...@@ -153,6 +153,30 @@ class RECV(opcua_device): ...@@ -153,6 +153,30 @@ class RECV(opcua_device):
# overloaded functions # overloaded functions
# -------- # --------
# --------
# internal functions
# --------
def _calculate_HBAT_bf_delays(self, delays: numpy.ndarray):
"""
Helper function that converts a signal path delay (in seconds) to an analog beam weight,
which is a value per tile per dipole per polarisation.
"""
# Duplicate delay values per polarisation
polarised_delays = numpy.tile(delays, 2) # output dims -> 96x32
# Add signal input delay
calibrated_delays = numpy.add(polarised_delays, self.HBAT_signal_input_delays)
# Find the right delay step by looking for the closest match in property RECV-> HBAT_bf_delay_step_delays
HBAT_bf_delays = numpy.zeros((96,32), dtype=numpy.int64)
distance = lambda x , y : numpy.absolute(x-y)
for tile in range(96):
for at in range(32):
delay = calibrated_delays[tile,at]
step = min(self.HBAT_bf_delay_step_delays,key=partial(distance,delay))
HBAT_bf_delays[tile,at] = numpy.where(self.HBAT_bf_delay_step_delays==step)[0][0]
return HBAT_bf_delays
# -------- # --------
# Commands # Commands
# -------- # --------
...@@ -183,6 +207,20 @@ class RECV(opcua_device): ...@@ -183,6 +207,20 @@ class RECV(opcua_device):
def get_hbat_signal_input_delays(self): def get_hbat_signal_input_delays(self):
""" Return the property HBAT_signal_input_delays (96x32) into a flatten array """ """ Return the property HBAT_signal_input_delays (96x32) into a flatten array """
return self.HBAT_signal_input_delays.flatten() return self.HBAT_signal_input_delays.flatten()
@command(dtype_in=DevVarFloatArray, dtype_out=DevVarFloatArray)
@DebugIt()
@only_in_states([DevState.ON])
def calculate_HBAT_bf_delays(self, delays: numpy.ndarray):
""" converts a signal path delay (in seconds) to an analog beam weight """
# Reshape the flatten input array
delays = numpy.array(delays).reshape(96,16)
# Calculate the beam weight array
HBAT_bf_delays = self._calculate_HBAT_bf_delays(delays)
return HBAT_bf_delays.flatten()
@command() @command()
@DebugIt() @DebugIt()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment