Skip to content
Snippets Groups Projects
Commit 9d8e5e7c authored by Stefano Di Frischia's avatar Stefano Di Frischia
Browse files

Merge branch 'L2SS-578-precise-hbat-delay-steps' into 'master'

Resolve L2SS-578 "Precise hbat delay steps"

Closes L2SS-578

See merge request !228
parents 7ee4bcd4 425bceba
No related branches found
No related tags found
1 merge request!228Resolve L2SS-578 "Precise hbat delay steps"
......@@ -9,6 +9,7 @@
import numpy
import datetime
from functools import partial
from tango.server import attribute, command, device_property
from tango import AttrWriteType, DebugIt, DevState, DeviceProxy, DevVarStringArray, DevVarDoubleArray
......@@ -105,7 +106,7 @@ class Beam(lofar_device):
return delays
@staticmethod
def _calculate_HBAT_bf_delays(delays: numpy.ndarray, HBAT_signal_input_delays: numpy.ndarray):
def _calculate_HBAT_bf_delays(delays: numpy.ndarray, HBAT_signal_input_delays: numpy.ndarray, HBAT_bf_delay_step_delays: numpy.ndarray):
"""
Helper function that converts a signal path delay (in seconds) to an analog beam weight,
which is a value per tile per dipole per polarisation.
......@@ -114,11 +115,16 @@ class Beam(lofar_device):
polarised_delays = numpy.tile(delays, 2) # output dims -> 96x32
# Add signal input delay
summed_delays = numpy.add(polarised_delays,HBAT_signal_input_delays)
# Divide by 0.5ns and round
HBAT_bf_delays = numpy.array(summed_delays / 0.5e-09, dtype=numpy.int64)
calibrated_delays = numpy.add(polarised_delays, HBAT_signal_input_delays)
# Find the right delay step by looking for the closest match in property RECV-> HBAT_bf_delay_step_delays
HBAT_bf_delays = numpy.zeros((96,32), dtype=numpy.int64)
distance = lambda x , y : numpy.absolute(x-y)
for tile in range(96):
for at in range(32):
delay = calibrated_delays[tile,at]
step = min(HBAT_bf_delay_step_delays,key=partial(distance,delay))
HBAT_bf_delays[tile,at] = numpy.where(HBAT_bf_delay_step_delays==step)[0][0]
return HBAT_bf_delays
def _HBAT_set_pointing(self, pointing_direction: numpy.array, timestamp: datetime.datetime = datetime.datetime.now()):
......@@ -129,7 +135,8 @@ class Beam(lofar_device):
delays = self._HBAT_delays(pointing_direction, timestamp)
# Convert delays into beam weights
HBAT_bf_delays = self._calculate_HBAT_bf_delays(delays, self.HBAT_signal_input_delays)
HBAT_bf_delay_step_delays = self.recv_proxy.get_hbat_bf_delay_step_delays()
HBAT_bf_delays = self._calculate_HBAT_bf_delays(delays, self.HBAT_signal_input_delays, HBAT_bf_delay_step_delays)
# Write weights to RECV
self.recv_proxy.HBAT_BF_delays_RW = HBAT_bf_delays
......
......@@ -15,7 +15,7 @@
from tango import DebugIt
from tango.server import command
from tango.server import device_property, attribute
from tango import AttrWriteType, DevState
from tango import AttrWriteType, DevState, DevVarFloatArray
import numpy
# Additional import
......@@ -32,6 +32,7 @@ __all__ = ["RECV", "main"]
@device_logging_to_python()
class RECV(opcua_device):
# -----------------
# Device Properties
# -----------------
......@@ -48,6 +49,20 @@ class RECV(opcua_device):
default_value=[True] * 32
)
HBAT_bf_delay_step_delays = device_property(
dtype="DevVarFloatArray",
mandatory=False,
default_value=numpy.array([
0.0, 0.5228E-9, 0.9797E-9, 1.4277E-9, 1.9055E-9,
2.4616E-9, 2.9539E-9, 3.4016E-9, 3.8076E-9, 4.3461E-9,
4.9876E-9, 5.4894E-9, 5.7973E-9, 6.2707E-9, 6.8628E-9,
7.3989E-9, 8.0673E-9, 8.6188E-9, 9.1039E-9, 9.5686E-9,
10.0463E-9, 10.5774E-9, 11.0509E-9, 11.5289E-9, 11.9374E-9,
12.4524E-9, 13.0842E-9, 13.5936E-9, 13.9198E-9, 14.4087E-9,
14.9781E-9, 15.5063E-9
],dtype=numpy.float64)
)
first_default_settings = [
# set the masks first, as those filter any subsequent settings
'ANT_mask_RW',
......@@ -111,6 +126,13 @@ class RECV(opcua_device):
# --------
# Commands
# --------
@command(dtype_out=DevVarFloatArray)
@DebugIt()
@only_in_states([DevState.ON])
def get_hbat_bf_delay_step_delays(self):
""" Return the property HBAT_bf_delay_step_delays """
return self.HBAT_bf_delay_step_delays
@command()
@DebugIt()
@only_in_states([DevState.STANDBY, DevState.ON])
......
......@@ -27,12 +27,27 @@ class TestDeviceBeam(AbstractTestBases.TestDeviceBase):
recv_proxy = TestDeviceProxy("STAT/RECV/1")
recv_proxy.off()
recv_proxy.initialise()
recv_proxy.set_defaults()
recv_proxy.on()
self.proxy.recv_proxy = recv_proxy
# Verify attribute is present (all zeros if never used before)
HBAT_delays_r1 = numpy.array(recv_proxy.read_attribute('HBAT_BF_delays_RW').value)
self.assertIsNotNone(HBAT_delays_r1)
# Unable to independently test '_calculate_HBAT_bf_delays' because it is not a Tango command,
# thus DeviceProxy cannot access it. On the other hand, the method cannot be unit-tested because it
# requires access to a DeviceRecv property from DeviceBeam. This last requirement should change with L2SS-574
#
# verify if values are actually transformed
# HBAT_delays_flat = self.proxy.HBAT_delays(numpy.array([["J2000","0deg","0deg"]] * 96).flatten())
# HBAT_delays = numpy.array(HBAT_delays_flat).reshape(96,16)
# HBAT_signal_input_delays = numpy.zeros((96,32), dtype=numpy.float64) # Property of Beam-device
# HBAT_bf_delay_step_delays = recv_proxy.get_hbat_bf_delay_step_delays()
# HBAT_bf_delays = self.proxy._calculate_HBAT_bf_delays(HBAT_delays, HBAT_signal_input_delays, HBAT_bf_delay_step_delays)
# self.assertNotEqual(HBAT_delays, HBAT_bf_delays)
# Verify writing operation does not lead to errors
self.proxy.HBAT_set_pointing(numpy.array([["J2000","0deg","0deg"]] * 96).flatten()) # write values to RECV
HBAT_delays_r2 = numpy.array(recv_proxy.read_attribute('HBAT_BF_delays_RW').value)
......
......@@ -71,11 +71,3 @@ class TestBeamDevice(base.TestCase):
# verify property is retrieved (workaround)
HBAT_signal_input_delays = beam.Beam.HBAT_signal_input_delays.default_value
self.assertTrue((HBAT_signal_input_delays==numpy.zeros((96,32), dtype=numpy.float64)).all())
# verify if values are actually transformed
HBAT_delays_flat = proxy.HBAT_delays(numpy.array([["J2000","0deg","0deg"]] * 96).flatten())
HBAT_delays = numpy.array(HBAT_delays_flat).reshape(96,16)
HBAT_bf_delays = beam.Beam._calculate_HBAT_bf_delays(HBAT_delays, HBAT_signal_input_delays)
self.assertNotEqual(HBAT_delays, HBAT_bf_delays)
self.assertTrue(HBAT_bf_delays[0][0]==numpy.int64((HBAT_delays[0][0]+HBAT_signal_input_delays[0][0])/0.5e-09))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment