Skip to content
Snippets Groups Projects
Commit d33224c8 authored by Jan David Mol's avatar Jan David Mol
Browse files

L2SS-480: Split commands and functions to have all the pytango command...

L2SS-480: Split commands and functions to have all the pytango command restrictions separated from the logic of our functions.
parent 9c073205
No related branches found
No related tags found
1 merge request!220Resolve L2SS-480 "Delays to beam weights"
......@@ -95,6 +95,78 @@ class Beam(lofar_device):
logger.warning("RECV device offline") # avoid error raising in unittest
super().configure_for_initialise()
# --------
# internal functions
# --------
def _set_pointing_directions(self, new_points: numpy.array):
if new_points.shape != (96,3):
raise ValueError(f"New pointing directions given as shape {new_points.shape}, must be (96,3)")
self._hbat_pointing_direction = new_points
def _set_pointing_epochs(self, new_points: numpy.array):
if new_points.shape != (96,):
raise ValueError(f"New pointing epochs given as shape {new_points.shape}, must be (96,)")
self._hbat_pointing_epoch = new_points
def _HBAT_delays(self, pointing_direction: numpy.array, timestamp: datetime.datetime = datetime.datetime.now()):
"""
Calculate the delays (in seconds) based on the pointing list and the timestamp
TBD: antenna and reference positions will be retrieved from RECV and not stored as BEAM device properties
"""
delays = numpy.zeros((96,16), dtype=numpy.float64)
for tile in range(96):
# initialise delay calculator
d = delay_calculator(self.reference_itrf[tile])
d.set_measure_time(timestamp)
# calculate the delays based on the set reference position, the set time and now the set direction and antenna positions
delays[tile] = d.convert(pointing_direction[tile], self.antenna_itrf[tile])
return delays
def _calculate_HBAT_bf_delays(self, delays: numpy.ndarray):
"""
Helper function that converts a signal path delay (in seconds) to an analog beam weight,
which is a value per tile per dipole per polarisation.
"""
# Duplicate delay values per polarisation
polarised_delays = numpy.tile(delays, 2) # output dims -> 96x32
# Divide by 0.5ns and round
HBAT_bf_delays = numpy.array(polarised_delays / 0.5e-09, dtype=numpy.int64)
return HBAT_bf_delays
def _HBAT_set_pointing(self, pointing_direction: numpy.array, timestamp: datetime.datetime = datetime.datetime.now()):
"""
Uploads beam weights based on a given pointing direction 2D array (96 tiles x 3 parameters)
"""
# Retrieve delays from casacore
delays = self._HBAT_delays(pointing_direction, timestamp)
# Convert delays into beam weights
HBAT_bf_delays = self._calculate_HBAT_bf_delays(delays)
# Write weights to RECV
self.recv_proxy.HBAT_BF_delays_RW = HBAT_bf_delays
# Record where we now point to, now that we've updated the weights.
# Only the entries within the mask have been updated:
# mask = False -> retain value, mask = True -> use new value
mask = self.recv_proxy.RCU_mask_RW
pointing_epochs = numpy.tile([timestamp], 96)
for tile in range(96):
if not mask[tile]:
pointing_direction[tile] = self.hbat_pointing_direction[tile]
pointing_epochs[tile] = self.hbat_pointing_epoch[tile]
self._set_pointing_directions(pointing_direction)
self._set_pointing_epochs(pointing_epochs)
# --------
# Commands
# --------
......@@ -103,16 +175,15 @@ class Beam(lofar_device):
@command(dtype_in=(numpy.str,))
@only_in_states([DevState.STANDBY, DevState.ON])
def set_pointing_directions(self, new_points: numpy.array):
self._hbat_pointing_direction = numpy.array(new_points).reshape(96,3)
new_points = numpy.array(new_points).reshape(96,3)
self._set_pointing_directions(new_points)
@DebugIt()
@command(dtype_in=(numpy.double,))
@only_in_states([DevState.STANDBY, DevState.ON])
def set_pointing_epochs(self, new_points: numpy.array):
if new_points.shape != (96,):
raise ValueError(f"New direction epochs given as shape {new_points.shape}, must be (96,)")
self._hbat_pointing_epoch = new_points
self._set_pointing_epochs(new_points)
@command(dtype_out=str, doc_out="Name of newly installed measures directory")
@DebugIt()
......@@ -156,13 +227,7 @@ class Beam(lofar_device):
"""
pointing_direction = numpy.array(pointing_direction).reshape(96,3)
delays = numpy.zeros((96,16),dtype=numpy.float64)
for tile in range(0,96):
# initialise delay calculator
d = delay_calculator(self.reference_itrf[tile])
d.set_measure_time(timestamp)
# calculate the delays based on the set reference position, the set time and now the set direction and antenna positions
delays[tile] = d.convert(pointing_direction[tile], self.antenna_itrf[tile])
delays = self._HBAT_delays(pointing_direction, timestamp)
return delays.flatten()
......@@ -175,11 +240,12 @@ class Beam(lofar_device):
Helper function that converts a signal path delay (in seconds) to an analog beam weight,
which is a value per tile per dipole per polarisation.
"""
# Duplicate delay values per polarisation
polarised_delays = numpy.tile(delays,2) # output dims -> 96x32
# Divide by 0.5ns and round
HBAT_bf_delays = numpy.array(polarised_delays / 0.5e-09, dtype=numpy.int64)
return HBAT_bf_delays
delays = delays.reshape(96,16)
result = self._calculate_HBAT_bf_delays(delays)
return result.flatten()
@command(dtype_in=DevVarStringArray)
@DebugIt()
......@@ -188,19 +254,9 @@ class Beam(lofar_device):
"""
Uploads beam weights based on a given pointing direction 2D array (96 tiles x 3 parameters)
"""
# Retrieve delays from casacore
delays = self.HBAT_delays(pointing_direction,timestamp)
# Convert delays into beam weights
HBAT_bf_delays = self.calculate_HBAT_bf_delays(delays)
# Write weights to RECV
self.recv_proxy.write_attribute('HBAT_BF_delays_RW',HBAT_bf_delays)
pointing_direction = numpy.array(pointing_direction).reshape(96,3)
# Record where we now point to, now that we've updated the weights.
# Only the entries within the mask have been updated:
# mask = False -> retain value, mask = True -> use new value
mask = self.recv_proxy.RCU_mask_RW
self.set_pointing_directions((self.hbat_pointing_direction * ~mask) + (pointing_direction * mask))
self.set_pointing_epochs((self.hbat_pointing_epoch * ~mask) + (numpy.tile([timestamp],96) * mask))
self._HBAT_set_pointing(pointing_direction, timestamp)
# ----------
# Run server
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment