Skip to content
Snippets Groups Projects
Commit 2a3b16cb authored by Jan David Mol's avatar Jan David Mol
Browse files

L2SS-615: Rename HBAT_bf_delays -> HBAT_bf_delay_steps to better describe its...

L2SS-615: Rename HBAT_bf_delays -> HBAT_bf_delay_steps to better describe its unit (they're steps, not seconds)
parent ad091512
No related branches found
No related tags found
1 merge request!237L2SS-615: Rename HBAT_bf_delays -> HBAT_bf_delay_steps to better describe its unit
......@@ -102,11 +102,11 @@ class Beam(lofar_device):
# Convert delays into beam weights
delays = delays.flatten()
HBAT_bf_delays = self.recv_proxy.calculate_HBAT_bf_delays(delays)
HBAT_bf_delays = numpy.array(HBAT_bf_delays, dtype=numpy.int64).reshape(96,32)
HBAT_bf_delay_steps = self.recv_proxy.calculate_HBAT_bf_delay_steps(delays)
HBAT_bf_delay_steps = numpy.array(HBAT_bf_delay_steps, dtype=numpy.int64).reshape(96,32)
# Write weights to RECV
self.recv_proxy.HBAT_BF_delays_RW = HBAT_bf_delays
self.recv_proxy.HBAT_BF_delay_steps_RW = HBAT_bf_delay_steps
# Record where we now point to, now that we've updated the weights.
# Only the entries within the mask have been updated
......
......@@ -109,8 +109,8 @@ class RECV(opcua_device):
# The HBAT beamformer delays represent 32 delays for each of the 96 inputs.
# The 32 delays deconstruct as delays[polarisation][dipole], and each delay is the number of 'delay steps' to apply (0.5ns for HBAT1).
HBAT_BF_delays_R = attribute_wrapper(comms_annotation=["HBAT_BF_delays_R" ],datatype=numpy.int64 , dims=(32,96))
HBAT_BF_delays_RW = attribute_wrapper(comms_annotation=["HBAT_BF_delays_RW" ],datatype=numpy.int64 , dims=(32,96), access=AttrWriteType.READ_WRITE)
HBAT_BF_delay_steps_R = attribute_wrapper(comms_annotation=["HBAT_BF_delay_steps_R" ],datatype=numpy.int64 , dims=(32,96))
HBAT_BF_delay_steps_RW = attribute_wrapper(comms_annotation=["HBAT_BF_delay_steps_RW" ],datatype=numpy.int64 , dims=(32,96), access=AttrWriteType.READ_WRITE)
HBAT_LED_on_R = attribute_wrapper(comms_annotation=["HBAT_LED_on_R" ],datatype=numpy.bool_ , dims=(32,96))
HBAT_LED_on_RW = attribute_wrapper(comms_annotation=["HBAT_LED_on_RW" ],datatype=numpy.bool_ , dims=(32,96), access=AttrWriteType.READ_WRITE)
HBAT_PWR_LNA_on_R = attribute_wrapper(comms_annotation=["HBAT_PWR_LNA_on_R" ],datatype=numpy.bool_ , dims=(32,96))
......@@ -156,7 +156,7 @@ class RECV(opcua_device):
# --------
# internal functions
# --------
def _calculate_HBAT_bf_delays(self, delays: numpy.ndarray):
def _calculate_HBAT_bf_delay_steps(self, delays: numpy.ndarray):
"""
Helper function that converts a signal path delay (in seconds) to an analog beam weight,
which is a value per tile per dipole per polarisation.
......@@ -168,14 +168,14 @@ class RECV(opcua_device):
calibrated_delays = numpy.add(polarised_delays, self.HBAT_signal_input_delays)
# Find the right delay step by looking for the closest match in property RECV-> HBAT_bf_delay_step_delays
HBAT_bf_delays = numpy.zeros((96,32), dtype=numpy.int64)
HBAT_bf_delay_steps = numpy.zeros((96,32), dtype=numpy.int64)
distance = lambda x , y : numpy.absolute(x-y)
for tile in range(96):
for at in range(32):
delay = calibrated_delays[tile,at]
step = min(self.HBAT_bf_delay_step_delays,key=partial(distance,delay))
HBAT_bf_delays[tile,at] = numpy.where(self.HBAT_bf_delay_step_delays==step)[0][0]
return HBAT_bf_delays
HBAT_bf_delay_steps[tile,at] = numpy.where(self.HBAT_bf_delay_step_delays==step)[0][0]
return HBAT_bf_delay_steps
# --------
# Commands
......@@ -211,16 +211,16 @@ class RECV(opcua_device):
@command(dtype_in=DevVarFloatArray, dtype_out=DevVarFloatArray)
@DebugIt()
@only_in_states([DevState.ON])
def calculate_HBAT_bf_delays(self, delays: numpy.ndarray):
def calculate_HBAT_bf_delay_steps(self, delays: numpy.ndarray):
""" converts a signal path delay (in seconds) to an analog beam weight """
# Reshape the flatten input array
delays = numpy.array(delays).reshape(96,16)
# Calculate the beam weight array
HBAT_bf_delays = self._calculate_HBAT_bf_delays(delays)
HBAT_bf_delay_steps = self._calculate_HBAT_bf_delay_steps(delays)
return HBAT_bf_delays.flatten()
return HBAT_bf_delay_steps.flatten()
@command()
@DebugIt()
......
......@@ -60,11 +60,11 @@ class TestDeviceBeam(AbstractTestBases.TestDeviceBase):
self.assertEqual(DevState.ON, self.proxy.state())
# Verify attribute is present (all zeros if never used before)
HBAT_delays_r1 = numpy.array(recv_proxy.read_attribute('HBAT_BF_delays_RW').value)
HBAT_delays_r1 = numpy.array(recv_proxy.read_attribute('HBAT_BF_delay_steps_RW').value)
self.assertIsNotNone(HBAT_delays_r1)
self.proxy.HBAT_set_pointing(self.pointing_direction) # write values to RECV
HBAT_delays_r2 = numpy.array(recv_proxy.read_attribute('HBAT_BF_delays_RW').value)
HBAT_delays_r2 = numpy.array(recv_proxy.read_attribute('HBAT_BF_delay_steps_RW').value)
self.assertIsNotNone(HBAT_delays_r2)
# Verify delays changed (to be discussed)
......
......@@ -64,10 +64,10 @@ class TestRecvDevice(base.TestCase):
self.init_device(proxy)
self.assertEqual(3072, len(proxy.get_hbat_signal_input_delays())) # 96x32=3072
def test_calculate_HBAT_bf_delays(self):
def test_calculate_HBAT_bf_delay_steps(self):
"""Verify HBAT beamforming calculations are correctly executed"""
with DeviceTestContext(recv.RECV, properties=self.recv_properties, process=True, timeout=10) as proxy:
self.init_device(proxy)
delays = numpy.random.rand(96,16).flatten()
HBAT_bf_delays = proxy.calculate_HBAT_bf_delays(delays)
self.assertEqual(3072, len(HBAT_bf_delays)) # 96x32=3072
HBAT_bf_delay_steps = proxy.calculate_HBAT_bf_delay_steps(delays)
self.assertEqual(3072, len(HBAT_bf_delay_steps)) # 96x32=3072
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment