Skip to content
Snippets Groups Projects
Commit 253fcc12 authored by Jan David Mol's avatar Jan David Mol
Browse files

L2SS-480: avoid being able to write the HBAT_pointing attributes from a proxy,...

L2SS-480: avoid being able to write the HBAT_pointing attributes from a proxy, and simplified the interface and tests
parent 34723a9f
No related branches found
No related tags found
1 merge request!220Resolve L2SS-480 "Delays to beam weights"
...@@ -32,14 +32,6 @@ class Beam(lofar_device): ...@@ -32,14 +32,6 @@ class Beam(lofar_device):
_hbat_pointing_direction = numpy.zeros((96,3), dtype=numpy.str) _hbat_pointing_direction = numpy.zeros((96,3), dtype=numpy.str)
_hbat_pointing_epoch = numpy.zeros(96, dtype=numpy.double) _hbat_pointing_epoch = numpy.zeros(96, dtype=numpy.double)
@property
def hbat_pointing_direction(self):
return tuple(self._hbat_pointing_direction)
@property
def hbat_pointing_epoch(self):
return tuple(self._hbat_pointing_epoch)
# ----------------- # -----------------
# Device Properties # Device Properties
# ----------------- # -----------------
...@@ -61,11 +53,11 @@ class Beam(lofar_device): ...@@ -61,11 +53,11 @@ class Beam(lofar_device):
HBAT_pointing_direction_R = attribute(access=AttrWriteType.READ, HBAT_pointing_direction_R = attribute(access=AttrWriteType.READ,
dtype=((numpy.str,),), max_dim_x=3, max_dim_y=96, dtype=((numpy.str,),), max_dim_x=3, max_dim_y=96,
fget=lambda self: self.hbat_pointing_direction) fget=lambda self: self._hbat_pointing_direction)
HBAT_pointing_epoch_R = attribute(access=AttrWriteType.READ, HBAT_pointing_epoch_R = attribute(access=AttrWriteType.READ,
dtype=(numpy.double,), max_dim_x=96, dtype=(numpy.double,), max_dim_x=96,
fget=lambda self: self.hbat_pointing_epoch) fget=lambda self: self._hbat_pointing_epoch)
RECV_name = 'stat/recv/1' RECV_name = 'stat/recv/1'
...@@ -92,18 +84,6 @@ class Beam(lofar_device): ...@@ -92,18 +84,6 @@ class Beam(lofar_device):
# internal functions # internal functions
# -------- # --------
def _set_pointing_directions(self, new_points: numpy.array):
if new_points.shape != (96,3):
raise ValueError(f"New pointing directions given as shape {new_points.shape}, must be (96,3)")
self._hbat_pointing_direction = new_points
def _set_pointing_epochs(self, new_points: numpy.array):
if new_points.shape != (96,):
raise ValueError(f"New pointing epochs given as shape {new_points.shape}, must be (96,)")
self._hbat_pointing_epoch = new_points
def _HBAT_delays(self, pointing_direction: numpy.array, timestamp: datetime.datetime = datetime.datetime.now()): def _HBAT_delays(self, pointing_direction: numpy.array, timestamp: datetime.datetime = datetime.datetime.now()):
""" """
Calculate the delays (in seconds) based on the pointing list and the timestamp Calculate the delays (in seconds) based on the pointing list and the timestamp
...@@ -151,33 +131,15 @@ class Beam(lofar_device): ...@@ -151,33 +131,15 @@ class Beam(lofar_device):
# Only the entries within the mask have been updated: # Only the entries within the mask have been updated:
# mask = False -> retain value, mask = True -> use new value # mask = False -> retain value, mask = True -> use new value
mask = self.recv_proxy.RCU_mask_RW mask = self.recv_proxy.RCU_mask_RW
pointing_epochs = numpy.tile([timestamp], 96)
for tile in range(96): for tile in range(96):
if not mask[tile]: if mask[tile]:
pointing_direction[tile] = self.hbat_pointing_direction[tile] self._hbat_pointing_direction[tile] = pointing_direction[tile]
pointing_epochs[tile] = self.hbat_pointing_epoch[tile] self._hbat_pointing_epoch[tile] = timestamp
self._set_pointing_directions(pointing_direction)
self._set_pointing_epochs(pointing_epochs)
# -------- # --------
# Commands # Commands
# -------- # --------
@DebugIt()
@command(dtype_in=(numpy.str,))
@only_in_states([DevState.STANDBY, DevState.ON])
def set_pointing_directions(self, new_points: numpy.array):
new_points = numpy.array(new_points).reshape(96,3)
self._set_pointing_directions(new_points)
@DebugIt()
@command(dtype_in=(numpy.double,))
@only_in_states([DevState.STANDBY, DevState.ON])
def set_pointing_epochs(self, new_points: numpy.array):
self._set_pointing_epochs(new_points)
@command(dtype_out=str, doc_out="Name of newly installed measures directory") @command(dtype_out=str, doc_out="Name of newly installed measures directory")
@DebugIt() @DebugIt()
@log_exceptions() @log_exceptions()
......
...@@ -46,88 +46,6 @@ class TestBeamDevice(base.TestCase): ...@@ -46,88 +46,6 @@ class TestBeamDevice(base.TestCase):
self.assertEqual(96, len(proxy.read_attribute( self.assertEqual(96, len(proxy.read_attribute(
"HBAT_pointing_epoch_R").value)) "HBAT_pointing_epoch_R").value))
def test_set_pointing_direction(self):
"""Verify can set pointings attribute without error"""
with DeviceTestContext(beam.Beam, process=True) as proxy:
proxy.init()
proxy.Initialise()
self.assertEqual(DevState.STANDBY, proxy.state())
proxy.set_pointing_directions(numpy.zeros((96*3), dtype=numpy.str))
def test_set_pointing_epochs(self):
"""Verify can set epochs attribute without error"""
with DeviceTestContext(beam.Beam, process=True) as proxy:
proxy.init()
proxy.Initialise()
self.assertEqual(DevState.STANDBY, proxy.state())
proxy.set_pointing_epochs(numpy.zeros(96))
def test_direction_pointing(self):
"""Set and Get test with actual values for pointing attribute"""
with DeviceTestContext(beam.Beam, process=True) as proxy:
proxy.init()
proxy.Initialise()
self.assertEqual(DevState.STANDBY, proxy.state())
# Evaluate default all zeros are present using numpy array compare
compare_obj = numpy.zeros((96,3), dtype=numpy.str) == proxy.read_attribute("HBAT_pointing_direction_R").value
self.assertTrue(compare_obj.all())
# Set direction pointings to range of incrementing values
proxy.set_pointing_directions(numpy.array(["J2000","0deg", "0deg"] * 96))
# Verify attribute has been updated with correct data
compare_obj = numpy.array([["J2000","0deg", "0deg"]] * 96) == proxy.read_attribute("HBAT_pointing_direction_R").value
self.assertTrue(compare_obj.all())
def test_direction_epochs(self):
"""Set and Get test with actual values for pointing attribute"""
with DeviceTestContext(beam.Beam, process=True) as proxy:
proxy.init()
proxy.Initialise()
self.assertEqual(DevState.STANDBY, proxy.state())
# Evaluate default all zeros are present using numpy array compare
compare_obj = numpy.zeros(96) == proxy.read_attribute("HBAT_pointing_epoch_R").value
self.assertTrue(compare_obj.all())
# Set direction pointings to range of incrementing values
proxy.set_pointing_epochs(numpy.arange(0,96))
# Verify attribute has been updated with correct data
compare_obj = numpy.arange(0,96) == proxy.read_attribute("HBAT_pointing_epoch_R").value
self.assertTrue(compare_obj.all())
def test_pointing_invalid(self):
"""Test that set pointings command refuses invalid lengths"""
with DeviceTestContext(beam.Beam, process=True) as proxy:
proxy.init()
proxy.Initialise()
self.assertEqual(DevState.STANDBY, proxy.state())
# should return error due to invalid length
with self.assertRaises(DevFailed):
proxy.set_pointing_directions(numpy.zeros(55, dtype=numpy.str))
def test_epoch_invalid(self):
"""Test that set epochs command refuses invalid lengths"""
with DeviceTestContext(beam.Beam, process=True) as proxy:
proxy.init()
proxy.Initialise()
self.assertEqual(DevState.STANDBY, proxy.state())
# should return error due to invalid length
with self.assertRaises(DevFailed):
proxy.set_pointing_epochs(numpy.zeros(55))
def test_HBAT_delays_dims(self): def test_HBAT_delays_dims(self):
"""Verify HBAT delays are retrieved with correct dimensions""" """Verify HBAT delays are retrieved with correct dimensions"""
with DeviceTestContext(beam.Beam, process=True) as proxy: with DeviceTestContext(beam.Beam, process=True) as proxy:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment