Skip to content
Snippets Groups Projects
Commit e21958f4 authored by Stefano Di Frischia's avatar Stefano Di Frischia
Browse files

Merge branch 'L2SS-688-remove-hbat-prefixes' into 'master'

Resolve L2SS-688 "Remove hbat prefixes"

Closes L2SS-688

See merge request !295
parents c807192c d959b52e
No related branches found
No related tags found
1 merge request!295Resolve L2SS-688 "Remove hbat prefixes"
...@@ -8,19 +8,19 @@ Beam Tracking ...@@ -8,19 +8,19 @@ Beam Tracking
Beam tracking automatically recomputes and reapplies pointings periodically, and immediately when new pointings are configured. It exposes the following interface: Beam tracking automatically recomputes and reapplies pointings periodically, and immediately when new pointings are configured. It exposes the following interface:
:HBAT_tracking_enabled_R: Whether beam tracking is running. :Tracking_enabled_R: Whether beam tracking is running.
:type: ``bool`` :type: ``bool``
:HBAT_pointing_direction_RW: The direction in which the beam should be tracked for each antenna. The beam tracker will steer the beam periodically, and explicitly whenever the pointings change. :Pointing_direction_RW: The direction in which the beam should be tracked for each antenna. The beam tracker will steer the beam periodically, and explicitly whenever the pointings change.
:type: ``str[N_ant][3]`` :type: ``str[N_ant][3]``
:HBAT_pointing_direction_R: The last applied pointing of each antenna. :Pointing_direction_R: The last applied pointing of each antenna.
:type: ``str[N_ant][3]`` :type: ``str[N_ant][3]``
:HBAT_pointing_timestamp_R: The timestamp for which the last set pointing for each antenna was applied and set (in seconds since 1970). :Pointing_timestamp_R: The timestamp for which the last set pointing for each antenna was applied and set (in seconds since 1970).
:type: ``float[N_ant][3]`` :type: ``float[N_ant][3]``
...@@ -29,11 +29,11 @@ Beam Steering ...@@ -29,11 +29,11 @@ Beam Steering
The beam steering is responsible for pointing the beams at a target, by converting pointings to ``recv.HBAT_bf_delay_steps``. The beam steering is typically controlled by the beam tracker. To point the antennas in any direction manually, you should disable beam tracking first: The beam steering is responsible for pointing the beams at a target, by converting pointings to ``recv.HBAT_bf_delay_steps``. The beam steering is typically controlled by the beam tracker. To point the antennas in any direction manually, you should disable beam tracking first:
:HBAT_tracking_enabled_RW: Enable or disable beam tracking (default: ``True``). :Tracking_enabled_RW: Enable or disable beam tracking (default: ``True``).
:type: ``bool`` :type: ``bool``
:HBAT_set_pointing(pointings): Point the beams towards the specified ``pointings[N_ant][3]`` for all antennas (for which ``recv.ANT_mask_RW`` is set). :set_pointing(pointings): Point the beams towards the specified ``pointings[N_ant][3]`` for all antennas (for which ``recv.ANT_mask_RW`` is set).
:returns: ``None`` :returns: ``None``
......
...@@ -37,14 +37,14 @@ class TileBeam(beam_device): ...@@ -37,14 +37,14 @@ class TileBeam(beam_device):
# Device Properties # Device Properties
# ----------------- # -----------------
HBAT_beam_tracking_interval = device_property( Beam_tracking_interval = device_property(
dtype='DevFloat', dtype='DevFloat',
doc='HBAT beam weights updating interval time [seconds]', doc='Beam weights updating interval time [seconds]',
mandatory=False, mandatory=False,
default_value = 10.0 default_value = 10.0
) )
HBAT_beam_tracking_preparation_period = device_property( Beam_tracking_preparation_period = device_property(
dtype='DevFloat', dtype='DevFloat',
doc='Preparation time [seconds] needed before starting update operation', doc='Preparation time [seconds] needed before starting update operation',
mandatory=False, mandatory=False,
...@@ -55,27 +55,27 @@ class TileBeam(beam_device): ...@@ -55,27 +55,27 @@ class TileBeam(beam_device):
# Attributes # Attributes
# ---------- # ----------
HBAT_pointing_direction_R = attribute(access=AttrWriteType.READ, Pointing_direction_R = attribute(access=AttrWriteType.READ,
dtype=((numpy.str,),), max_dim_x=3, max_dim_y=96, dtype=((numpy.str,),), max_dim_x=3, max_dim_y=96,
fget=lambda self: self._hbat_pointing_direction_r) fget=lambda self: self._pointing_direction_r)
HBAT_pointing_direction_RW = attribute(access=AttrWriteType.READ_WRITE, Pointing_direction_RW = attribute(access=AttrWriteType.READ_WRITE,
dtype=((numpy.str,),), max_dim_x=3, max_dim_y=96, dtype=((numpy.str,),), max_dim_x=3, max_dim_y=96,
fget=lambda self: self._hbat_pointing_direction_rw) fget=lambda self: self._pointing_direction_rw)
HBAT_pointing_timestamp_R = attribute(access=AttrWriteType.READ, Pointing_timestamp_R = attribute(access=AttrWriteType.READ,
dtype=(numpy.double,), max_dim_x=96, dtype=(numpy.double,), max_dim_x=96,
fget=lambda self: self._hbat_pointing_timestamp_r) fget=lambda self: self._pointing_timestamp_r)
HBAT_tracking_enabled_R = attribute(access=AttrWriteType.READ, Tracking_enabled_R = attribute(access=AttrWriteType.READ,
doc="Whether the HBAT tile beam is updated periodically", doc="Whether the tile beam is updated periodically",
dtype=numpy.bool, dtype=numpy.bool,
fget=lambda self: self.HBAT_beam_tracker.is_alive()) fget=lambda self: self.Beam_tracker.is_alive())
HBAT_tracking_enabled_RW = attribute(access=AttrWriteType.READ_WRITE, Tracking_enabled_RW = attribute(access=AttrWriteType.READ_WRITE,
doc="Whether the HBAT tile beam should be updated periodically", doc="Whether the tile beam should be updated periodically",
dtype=numpy.bool, dtype=numpy.bool,
fget=lambda self: self._hbat_tracking_enabled_rw) fget=lambda self: self._tracking_enabled_rw)
# -------- # --------
# overloaded functions # overloaded functions
...@@ -85,19 +85,19 @@ class TileBeam(beam_device): ...@@ -85,19 +85,19 @@ class TileBeam(beam_device):
super().init_device() super().init_device()
# thread to perform beam tracking # thread to perform beam tracking
self.HBAT_beam_tracker = None self.Beam_tracker = None
@log_exceptions() @log_exceptions()
def configure_for_initialise(self): def configure_for_initialise(self):
super().configure_for_initialise() super().configure_for_initialise()
# Initialise pointing array data and attribute # Initialise pointing array data and attribute
self._hbat_pointing_timestamp_r = numpy.zeros(96, dtype=numpy.double) self._pointing_timestamp_r = numpy.zeros(96, dtype=numpy.double)
self._hbat_pointing_direction_r = numpy.zeros((96,3), dtype="<U32") self._pointing_direction_r = numpy.zeros((96,3), dtype="<U32")
self._hbat_pointing_direction_rw = numpy.array([["AZELGEO","0deg","90deg"]] * 96, dtype="<U32") self._pointing_direction_rw = numpy.array([["AZELGEO","0deg","90deg"]] * 96, dtype="<U32")
# Initialise tracking control # Initialise tracking control
self._hbat_tracking_enabled_rw = True self._tracking_enabled_rw = True
# Set a reference of RECV device that is correlated to this BEAM device # Set a reference of RECV device that is correlated to this BEAM device
util = Util.instance() util = Util.instance()
...@@ -116,22 +116,22 @@ class TileBeam(beam_device): ...@@ -116,22 +116,22 @@ class TileBeam(beam_device):
# absolute positions of each antenna element # absolute positions of each antenna element
self.HBAT_antenna_positions = [HBAT_reference_itrf[tile] + HBAT_antenna_itrf_offsets[tile] for tile in range(96)] self.HBAT_antenna_positions = [HBAT_reference_itrf[tile] + HBAT_antenna_itrf_offsets[tile] for tile in range(96)]
# Create a thread object to update HBAT beam weights # Create a thread object to update beam weights
self.HBAT_beam_tracker = BeamTracker(self) self.Beam_tracker = BeamTracker(self)
@log_exceptions() @log_exceptions()
def configure_for_on(self): def configure_for_on(self):
super().configure_for_on() super().configure_for_on()
# Start beam tracking thread # Start beam tracking thread
if self._hbat_tracking_enabled_rw: if self._tracking_enabled_rw:
self.HBAT_beam_tracker.start() self.Beam_tracker.start()
@log_exceptions() @log_exceptions()
def configure_for_off(self): def configure_for_off(self):
if self.HBAT_beam_tracker: if self.Beam_tracker:
# Stop thread object # Stop thread object
self.HBAT_beam_tracker.stop() self.Beam_tracker.stop()
super().configure_for_off() super().configure_for_off()
...@@ -139,28 +139,28 @@ class TileBeam(beam_device): ...@@ -139,28 +139,28 @@ class TileBeam(beam_device):
# internal functions # internal functions
# -------- # --------
def write_HBAT_pointing_direction_RW(self, value): def write_Pointing_direction_RW(self, value):
""" Setter method for attribute HBAT_pointing_direction_RW """ """ Setter method for attribute Pointing_direction_RW """
# verify whether values are valid # verify whether values are valid
for tile in range(96): for tile in range(96):
if not self.HBAT_delay_calculators[tile].is_valid_direction(value[tile]): if not self.HBAT_delay_calculators[tile].is_valid_direction(value[tile]):
raise ValueError(f"Invalid direction: {value[tile]}") raise ValueError(f"Invalid direction: {value[tile]}")
self._hbat_pointing_direction_rw = value self._pointing_direction_rw = value
# force update across tiles if pointing changes # force update across tiles if pointing changes
self.HBAT_beam_tracker.force_update() self.Beam_tracker.force_update()
logger.info("Pointing direction update requested") logger.info("Pointing direction update requested")
def write_HBAT_tracking_enabled_RW(self, value): def write_Tracking_enabled_RW(self, value):
self._hbat_tracking_enabled_rw = value self._tracking_enabled_rw = value
if value: if value:
self.HBAT_beam_tracker.start() self.Beam_tracker.start()
else: else:
self.HBAT_beam_tracker.stop() self.Beam_tracker.stop()
def _HBAT_delays(self, pointing_direction: numpy.array, timestamp: datetime.datetime = None): def _delays(self, pointing_direction: numpy.array, timestamp: datetime.datetime = None):
""" """
Calculate the delays (in seconds) based on the pointing list and the timestamp Calculate the delays (in seconds) based on the pointing list and the timestamp
""" """
...@@ -182,7 +182,7 @@ class TileBeam(beam_device): ...@@ -182,7 +182,7 @@ class TileBeam(beam_device):
return delays return delays
def _HBAT_set_pointing(self, pointing_direction: numpy.array, timestamp: datetime.datetime = None): def _set_pointing(self, pointing_direction: numpy.array, timestamp: datetime.datetime = None):
""" """
Uploads beam weights based on a given pointing direction 2D array (96 tiles x 3 parameters) Uploads beam weights based on a given pointing direction 2D array (96 tiles x 3 parameters)
""" """
...@@ -193,7 +193,7 @@ class TileBeam(beam_device): ...@@ -193,7 +193,7 @@ class TileBeam(beam_device):
timestamp = datetime.datetime.now() timestamp = datetime.datetime.now()
# Retrieve delays from casacore # Retrieve delays from casacore
delays = self._HBAT_delays(pointing_direction, timestamp) delays = self._delays(pointing_direction, timestamp)
# Convert delays into beam weights # Convert delays into beam weights
delays = delays.flatten() delays = delays.flatten()
...@@ -208,8 +208,8 @@ class TileBeam(beam_device): ...@@ -208,8 +208,8 @@ class TileBeam(beam_device):
mask = self.recv_proxy.ANT_mask_RW.flatten() mask = self.recv_proxy.ANT_mask_RW.flatten()
for rcu in range(96): for rcu in range(96):
if mask[rcu]: if mask[rcu]:
self._hbat_pointing_direction_r[rcu] = pointing_direction[rcu] self._pointing_direction_r[rcu] = pointing_direction[rcu]
self._hbat_pointing_timestamp_r[rcu] = timestamp.timestamp() self._pointing_timestamp_r[rcu] = timestamp.timestamp()
logger.info("Pointing direction updated") logger.info("Pointing direction updated")
# -------- # --------
...@@ -220,7 +220,7 @@ class TileBeam(beam_device): ...@@ -220,7 +220,7 @@ class TileBeam(beam_device):
@DebugIt() @DebugIt()
@log_exceptions() @log_exceptions()
@only_in_states(beam_device.DEFAULT_COMMAND_STATES) @only_in_states(beam_device.DEFAULT_COMMAND_STATES)
def HBAT_delays(self, pointing_direction: numpy.array, timestamp: datetime.datetime = None): def delays(self, pointing_direction: numpy.array, timestamp: datetime.datetime = None):
""" """
Calculate the delays (in seconds) based on the pointing list and the timestamp Calculate the delays (in seconds) based on the pointing list and the timestamp
TBD: antenna and reference positions will be retrieved from RECV and not stored as BEAM device properties TBD: antenna and reference positions will be retrieved from RECV and not stored as BEAM device properties
...@@ -233,7 +233,7 @@ class TileBeam(beam_device): ...@@ -233,7 +233,7 @@ class TileBeam(beam_device):
pointing_direction = numpy.array(pointing_direction).reshape(96,3) pointing_direction = numpy.array(pointing_direction).reshape(96,3)
delays = self._HBAT_delays(pointing_direction, timestamp) delays = self._delays(pointing_direction, timestamp)
return delays.flatten() return delays.flatten()
...@@ -241,7 +241,7 @@ class TileBeam(beam_device): ...@@ -241,7 +241,7 @@ class TileBeam(beam_device):
@DebugIt() @DebugIt()
@log_exceptions() @log_exceptions()
@only_in_states(beam_device.DEFAULT_COMMAND_STATES) @only_in_states(beam_device.DEFAULT_COMMAND_STATES)
def HBAT_set_pointing(self, pointing_direction: list, timestamp: datetime.datetime = None): def set_pointing(self, pointing_direction: list, timestamp: datetime.datetime = None):
""" """
Uploads beam weights based on a given pointing direction 2D array (96 tiles x 3 parameters) Uploads beam weights based on a given pointing direction 2D array (96 tiles x 3 parameters)
""" """
...@@ -254,12 +254,12 @@ class TileBeam(beam_device): ...@@ -254,12 +254,12 @@ class TileBeam(beam_device):
# Reshape the flatten input array # Reshape the flatten input array
pointing_direction = numpy.array(pointing_direction).reshape(96,3) pointing_direction = numpy.array(pointing_direction).reshape(96,3)
self._HBAT_set_pointing(pointing_direction, timestamp) self._set_pointing(pointing_direction, timestamp)
@command(dtype_in = DevString) @command(dtype_in = DevString)
@DebugIt() @DebugIt()
@only_in_states(beam_device.DEFAULT_COMMAND_STATES) @only_in_states(beam_device.DEFAULT_COMMAND_STATES)
def HBAT_set_pointing_for_specific_time(self, parameters: DevString = None): def set_pointing_for_specific_time(self, parameters: DevString = None):
""" """
Uploads beam weights based on a given pointing direction 2D array (96 tiles x 3 parameters) Uploads beam weights based on a given pointing direction 2D array (96 tiles x 3 parameters)
for the given timestamp for the given timestamp
...@@ -277,7 +277,7 @@ class TileBeam(beam_device): ...@@ -277,7 +277,7 @@ class TileBeam(beam_device):
# Reshape the flatten pointing array # Reshape the flatten pointing array
pointing_direction = numpy.array(pointing_direction).reshape(96,3) pointing_direction = numpy.array(pointing_direction).reshape(96,3)
self._HBAT_set_pointing(pointing_direction, timestamp) self._set_pointing(pointing_direction, timestamp)
# ---------- # ----------
...@@ -313,7 +313,7 @@ class BeamTracker(): ...@@ -313,7 +313,7 @@ class BeamTracker():
return return
self.done = False self.done = False
self.thread = Thread(target=self._update_HBAT_pointing_direction, name=f"BeamTracker of {self.device.get_name()}") self.thread = Thread(target=self._update_pointing_direction, name=f"BeamTracker of {self.device.get_name()}")
self.thread.start() self.thread.start()
logger.info("BeamTracking thread started") logger.info("BeamTracking thread started")
...@@ -359,13 +359,13 @@ class BeamTracker(): ...@@ -359,13 +359,13 @@ class BeamTracker():
now = datetime.datetime.now().timestamp() now = datetime.datetime.now().timestamp()
# Computes the left seconds before the next update # Computes the left seconds before the next update
next_update_in = self.device.HBAT_beam_tracking_interval - (now % self.device.HBAT_beam_tracking_interval) next_update_in = self.device.Beam_tracking_interval - (now % self.device.Beam_tracking_interval)
# Computes the needed sleep time before the next update # Computes the needed sleep time before the next update
sleep_time = next_update_in - self.device.HBAT_beam_tracking_preparation_period sleep_time = next_update_in - self.device.Beam_tracking_preparation_period
# If sleep time is negative, add the tracking interval for the next update # If sleep time is negative, add the tracking interval for the next update
if sleep_time < 0: if sleep_time < 0:
return sleep_time + self.device.HBAT_beam_tracking_interval return sleep_time + self.device.Beam_tracking_interval
else: else:
return sleep_time return sleep_time
...@@ -375,14 +375,14 @@ class BeamTracker(): ...@@ -375,14 +375,14 @@ class BeamTracker():
@log_exceptions() @log_exceptions()
@fault_on_error() @fault_on_error()
def _update_HBAT_pointing_direction(self): def _update_pointing_direction(self):
""" Updates the beam weights using a fixed interval of time """ """ Updates the beam weights using a fixed interval of time """
# Check if flag beamtracking is true # Check if flag beamtracking is true
with self.update_lock: with self.update_lock:
while not self.done: while not self.done:
self.stale_pointing = False self.stale_pointing = False
self.device._HBAT_set_pointing(self.device._hbat_pointing_direction_rw, datetime.datetime.now()) self.device._set_pointing(self.device._pointing_direction_rw, datetime.datetime.now())
# sleep until the next update, or when interrupted (this releases the lock, allowing for notification) # sleep until the next update, or when interrupted (this releases the lock, allowing for notification)
# note that we need wait_for as conditions can be triggered multiple times in succession # note that we need wait_for as conditions can be triggered multiple times in succession
......
...@@ -39,16 +39,16 @@ class TestDeviceTileBeam(AbstractTestBases.TestDeviceBase): ...@@ -39,16 +39,16 @@ class TestDeviceTileBeam(AbstractTestBases.TestDeviceBase):
recv_proxy.set_defaults() recv_proxy.set_defaults()
return recv_proxy return recv_proxy
def test_HBAT_delays_dims(self): def test_delays_dims(self):
"""Verify HBAT delays are retrieved with correct dimensions""" """Verify delays are retrieved with correct dimensions"""
self.setup_recv_proxy() self.setup_recv_proxy()
# setup BEAM # setup BEAM
self.proxy.warm_boot() self.proxy.warm_boot()
# verify HBAT_delays method returns the correct dimensions # verify delays method returns the correct dimensions
HBAT_delays = self.proxy.HBAT_delays(self.pointing_direction) delays = self.proxy.delays(self.pointing_direction)
self.assertEqual(1536, len(HBAT_delays)) # 96*16 self.assertEqual(1536, len(delays)) # 96*16
def test_set_pointing(self): def test_set_pointing(self):
"""Verify if set pointing procedure is correctly executed""" """Verify if set pointing procedure is correctly executed"""
...@@ -56,32 +56,32 @@ class TestDeviceTileBeam(AbstractTestBases.TestDeviceBase): ...@@ -56,32 +56,32 @@ class TestDeviceTileBeam(AbstractTestBases.TestDeviceBase):
# setup BEAM # setup BEAM
self.proxy.warm_boot() self.proxy.warm_boot()
self.proxy.HBAT_tracking_enabled_RW = False self.proxy.Tracking_enabled_RW = False
# Verify attribute is present (all zeros if never used before) # Verify attribute is present (all zeros if never used before)
HBAT_delays_r1 = numpy.array(recv_proxy.read_attribute('HBAT_BF_delay_steps_RW').value) delays_r1 = numpy.array(recv_proxy.read_attribute('HBAT_BF_delay_steps_RW').value)
self.assertIsNotNone(HBAT_delays_r1) self.assertIsNotNone(delays_r1)
time.sleep(3) time.sleep(3)
# Verify writing operation does not lead to errors # Verify writing operation does not lead to errors
self.proxy.HBAT_set_pointing(self.pointing_direction) # write values to RECV self.proxy.set_pointing(self.pointing_direction) # write values to RECV
HBAT_delays_r2 = numpy.array(recv_proxy.read_attribute('HBAT_BF_delay_steps_RW').value) delays_r2 = numpy.array(recv_proxy.read_attribute('HBAT_BF_delay_steps_RW').value)
self.assertIsNotNone(HBAT_delays_r2) self.assertIsNotNone(delays_r2)
# Verify delays changed (to be discussed) # Verify delays changed (to be discussed)
#self.assertFalse((HBAT_delays_r1==HBAT_delays_r2).all()) #self.assertFalse((delays_r1==delays_r2).all())
def test_pointing_to_zenith(self): def test_pointing_to_zenith(self):
# setup RECV as well # setup RECV as well
recv_proxy = self.setup_recv_proxy() recv_proxy = self.setup_recv_proxy()
self.proxy.warm_boot() self.proxy.warm_boot()
self.proxy.HBAT_tracking_enabled_RW = False self.proxy.Tracking_enabled_RW = False
# Point to Zenith # Point to Zenith
self.proxy.HBAT_set_pointing(numpy.array([["AZELGEO","0deg","90deg"]] * 96).flatten()) self.proxy.set_pointing(numpy.array([["AZELGEO","0deg","90deg"]] * 96).flatten())
calculated_HBAT_delay_steps = numpy.array(recv_proxy.read_attribute('HBAT_BF_delay_steps_RW').value) calculated_HBAT_delay_steps = numpy.array(recv_proxy.read_attribute('HBAT_BF_delay_steps_RW').value)
...@@ -94,10 +94,10 @@ class TestDeviceTileBeam(AbstractTestBases.TestDeviceBase): ...@@ -94,10 +94,10 @@ class TestDeviceTileBeam(AbstractTestBases.TestDeviceBase):
recv_proxy = self.setup_recv_proxy() recv_proxy = self.setup_recv_proxy()
self.proxy.warm_boot() self.proxy.warm_boot()
self.proxy.HBAT_tracking_enabled_RW = False self.proxy.Tracking_enabled_RW = False
# point at north on the horizon # point at north on the horizon
self.proxy.HBAT_set_pointing(["AZELGEO","0deg","0deg"] * 96) self.proxy.set_pointing(["AZELGEO","0deg","0deg"] * 96)
# obtain delays of the X polarisation of all the elements of the first tile # obtain delays of the X polarisation of all the elements of the first tile
north_beam_delay_steps = recv_proxy.HBAT_BF_delay_steps_RW[0].reshape(2,4,4)[0] north_beam_delay_steps = recv_proxy.HBAT_BF_delay_steps_RW[0].reshape(2,4,4)[0]
...@@ -107,7 +107,7 @@ class TestDeviceTileBeam(AbstractTestBases.TestDeviceBase): ...@@ -107,7 +107,7 @@ class TestDeviceTileBeam(AbstractTestBases.TestDeviceBase):
for angle in (90,180,270): for angle in (90,180,270):
# point at angle degrees (90=E, 180=S, 270=W) # point at angle degrees (90=E, 180=S, 270=W)
self.proxy.HBAT_set_pointing(["AZELGEO",f"{angle}deg","0deg"] * 96) self.proxy.set_pointing(["AZELGEO",f"{angle}deg","0deg"] * 96)
# obtain delays of the X polarisation of all the elements of the first tile # obtain delays of the X polarisation of all the elements of the first tile
angled_beam_delay_steps = recv_proxy.HBAT_BF_delay_steps_RW[0].reshape(2,4,4)[0] angled_beam_delay_steps = recv_proxy.HBAT_BF_delay_steps_RW[0].reshape(2,4,4)[0]
...@@ -121,7 +121,7 @@ class TestDeviceTileBeam(AbstractTestBases.TestDeviceBase): ...@@ -121,7 +121,7 @@ class TestDeviceTileBeam(AbstractTestBases.TestDeviceBase):
recv_proxy = self.setup_recv_proxy() recv_proxy = self.setup_recv_proxy()
self.proxy.warm_boot() self.proxy.warm_boot()
self.proxy.HBAT_tracking_enabled_RW = False self.proxy.Tracking_enabled_RW = False
# Point to LOFAR 1 ref pointing (0.929342, 0.952579, J2000) # Point to LOFAR 1 ref pointing (0.929342, 0.952579, J2000)
pointings = numpy.array([["J2000", "0.929342rad", "0.952579rad"]] * 96).flatten() pointings = numpy.array([["J2000", "0.929342rad", "0.952579rad"]] * 96).flatten()
...@@ -134,7 +134,7 @@ class TestDeviceTileBeam(AbstractTestBases.TestDeviceBase): ...@@ -134,7 +134,7 @@ class TestDeviceTileBeam(AbstractTestBases.TestDeviceBase):
} }
json_string = json.dumps(parameters, cls=NumpyEncoder) json_string = json.dumps(parameters, cls=NumpyEncoder)
self.proxy.HBAT_set_pointing_for_specific_time(json_string) self.proxy.set_pointing_for_specific_time(json_string)
calculated_HBAT_delay_steps = numpy.array(recv_proxy.read_attribute('HBAT_BF_delay_steps_RW').value) calculated_HBAT_delay_steps = numpy.array(recv_proxy.read_attribute('HBAT_BF_delay_steps_RW').value)
...@@ -154,12 +154,12 @@ class TestDeviceTileBeam(AbstractTestBases.TestDeviceBase): ...@@ -154,12 +154,12 @@ class TestDeviceTileBeam(AbstractTestBases.TestDeviceBase):
self.proxy.warm_boot() self.proxy.warm_boot()
# check if we're really tracking # check if we're really tracking
self.assertTrue(self.proxy.HBAT_tracking_enabled_R) self.assertTrue(self.proxy.Tracking_enabled_R)
# point somewhere # point somewhere
new_pointings = [("J2000",f"{tile}deg","0deg") for tile in range(96)] new_pointings = [("J2000",f"{tile}deg","0deg") for tile in range(96)]
self.proxy.HBAT_pointing_direction_RW = new_pointings self.proxy.Pointing_direction_RW = new_pointings
# check pointing # check pointing
self.assertListEqual(new_pointings, list(self.proxy.HBAT_pointing_direction_R)) self.assertListEqual(new_pointings, list(self.proxy.Pointing_direction_R))
...@@ -59,7 +59,7 @@ class TestRecvCluster(base.IntegrationTestCase): ...@@ -59,7 +59,7 @@ class TestRecvCluster(base.IntegrationTestCase):
for _i in range(25): for _i in range(25):
start_time = time.monotonic_ns() start_time = time.monotonic_ns()
for proxy in beam_proxies: for proxy in beam_proxies:
proxy.HBAT_set_pointing(self.pointing_direction) proxy.set_pointing(self.pointing_direction)
stop_time = time.monotonic_ns() stop_time = time.monotonic_ns()
results.append(stop_time - start_time) results.append(stop_time - start_time)
......
...@@ -33,11 +33,11 @@ class TestBeamDevice(base.TestCase): ...@@ -33,11 +33,11 @@ class TestBeamDevice(base.TestCase):
with DeviceTestContext(tilebeam.TileBeam, process=True, timeout=10) as proxy: with DeviceTestContext(tilebeam.TileBeam, process=True, timeout=10) as proxy:
proxy.initialise() proxy.initialise()
self.assertEqual(96, len(proxy.read_attribute( self.assertEqual(96, len(proxy.read_attribute(
"HBAT_pointing_direction_R").value)) "Pointing_direction_R").value))
def test_get_pointing_timestamps(self): def test_get_pointing_timestamps(self):
"""Verify can read timestamps attribute and length matches without err""" """Verify can read timestamps attribute and length matches without err"""
with DeviceTestContext(tilebeam.TileBeam, process=True, timeout=10) as proxy: with DeviceTestContext(tilebeam.TileBeam, process=True, timeout=10) as proxy:
proxy.initialise() proxy.initialise()
self.assertEqual(96, len(proxy.read_attribute( self.assertEqual(96, len(proxy.read_attribute(
"HBAT_pointing_timestamp_R").value)) "Pointing_timestamp_R").value))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment