diff --git a/tangostationcontrol/tangostationcontrol/devices/beam.py b/tangostationcontrol/tangostationcontrol/devices/beam.py index d57553fb336d71dd506ddd93b28728b18e908a7a..18d62f0a5aa9ec863f38b1cb6bbef61d64f35fdb 100644 --- a/tangostationcontrol/tangostationcontrol/devices/beam.py +++ b/tangostationcontrol/tangostationcontrol/devices/beam.py @@ -11,7 +11,7 @@ from email.policy import default import numpy import datetime from tango.server import attribute, command, device_property -from tango import AttrWriteType, AttrDataFormat, DebugIt, DevState, DeviceProxy, DevVarStringArray, DevVarFloatArray, DevVarLong64Array, DevVarDoubleArray +from tango import AttrWriteType, DebugIt, DevState, DeviceProxy, DevVarStringArray, DevVarDoubleArray # Additional import from tangostationcontrol.common.entrypoint import entry @@ -66,8 +66,6 @@ class Beam(lofar_device): dtype=(numpy.double,), max_dim_x=96, fget=lambda self: self._hbat_pointing_timestamp) - RECV_name = 'stat/recv/1' - # Directory where the casacore measures that we use, reside. We configure ~/.casarc to # use the symlink /opt/IERS/current, which we switch to the actual set of files to use. measures_directory_R = attribute(dtype=str, access=AttrWriteType.READ, fget = lambda self: get_measures_directory()) @@ -80,13 +78,11 @@ class Beam(lofar_device): # -------- @log_exceptions() def configure_for_initialise(self): - # Set a reference of RECV device - try: - self.recv_proxy = DeviceProxy(self.RECV_name) - except: - logger.warning("RECV device offline") # avoid error raising in unittest super().configure_for_initialise() + # Set a reference of RECV device + self.recv_proxy = DeviceProxy("STAT/RECV/1") + # -------- # internal functions # -------- @@ -109,7 +105,8 @@ class Beam(lofar_device): return delays - def _calculate_HBAT_bf_delays(self, delays: numpy.ndarray): + @staticmethod + def _calculate_HBAT_bf_delays(delays: numpy.ndarray): """ Helper function that converts a signal path delay (in seconds) to an analog beam weight, which is a value per tile per dipole per polarisation. @@ -140,7 +137,7 @@ class Beam(lofar_device): # Record where we now point to, now that we've updated the weights. # Only the entries within the mask have been updated mask = self.recv_proxy.RCU_mask_RW - for tile in range(96): + for tile in range(32): if mask[tile]: self._hbat_pointing_direction[tile] = pointing_direction[tile] self._hbat_pointing_timestamp[tile] = timestamp @@ -180,7 +177,7 @@ class Beam(lofar_device): logger.warning("Restarting device to activate new measures tables") restart_python() - @command(dtype_in=DevVarStringArray, dformat_in=AttrDataFormat.IMAGE, dtype_out=DevVarDoubleArray) + @command(dtype_in=DevVarStringArray, dtype_out=DevVarDoubleArray) @DebugIt() @log_exceptions() @only_in_states([DevState.ON]) @@ -195,22 +192,6 @@ class Beam(lofar_device): return delays.flatten() - @command(dtype_in=DevVarFloatArray, dtype_out=DevVarLong64Array) - @DebugIt() - @log_exceptions() - @only_in_states([DevState.ON]) - def calculate_HBAT_bf_delays(self, delays: numpy.ndarray): - """ - Helper function that converts a signal path delay (in seconds) to an analog beam weight, - which is a value per tile per dipole per polarisation. - """ - - delays = delays.reshape(96,16) - - result = self._calculate_HBAT_bf_delays(delays) - - return result.flatten() - @command(dtype_in=DevVarStringArray) @DebugIt() @only_in_states([DevState.ON]) diff --git a/tangostationcontrol/tangostationcontrol/integration_test/devices/test_device_beam.py b/tangostationcontrol/tangostationcontrol/integration_test/devices/test_device_beam.py index 8c546220d2c7570268e2754377a9594c23a06fa7..8e1f2091f41504358619d793aa98ba52761f5e03 100644 --- a/tangostationcontrol/tangostationcontrol/integration_test/devices/test_device_beam.py +++ b/tangostationcontrol/tangostationcontrol/integration_test/devices/test_device_beam.py @@ -34,7 +34,7 @@ class TestDeviceBeam(AbstractTestBases.TestDeviceBase): self.assertIsNotNone(HBAT_delays_r1) # Verify writing operation does not lead to errors - self.proxy.HBAT_set_pointing(["J2000", "0deg", "0deg"] * 96) # write values to RECV + self.proxy.HBAT_set_pointing(numpy.array([["J2000","0deg","0deg"]] * 96).flatten()) # write values to RECV HBAT_delays_r2 = numpy.array(recv_proxy.read_attribute('HBAT_BF_delays_RW').value) self.assertIsNotNone(HBAT_delays_r2) diff --git a/tangostationcontrol/tangostationcontrol/test/devices/test_beam_device.py b/tangostationcontrol/tangostationcontrol/test/devices/test_beam_device.py index 8db5c40bc43900e74c8d253d79ab84d35f7b07f0..bb89efc7a579cbb74182401c884713db4e32a116 100644 --- a/tangostationcontrol/tangostationcontrol/test/devices/test_beam_device.py +++ b/tangostationcontrol/tangostationcontrol/test/devices/test_beam_device.py @@ -7,13 +7,12 @@ # Distributed under the terms of the APACHE license. # See LICENSE.txt for more info. -import numpy - -from tango import DevState, DevFailed +from tango import DevState from tango.test_context import DeviceTestContext from tangostationcontrol.devices import beam +import numpy import mock from tangostationcontrol.test import base @@ -32,6 +31,13 @@ class TestBeamDevice(base.TestCase): beam.Beam, 'init_device', spec=beam.Beam.init_device) self.m_init = init_patcher.start() self.addCleanup(init_patcher.stop) + + # Patch DeviceProxy to allow making the proxies during initialisation + # that we otherwise avoid using + proxy_patcher = mock.patch.object( + beam, 'DeviceProxy') + self.m_init = proxy_patcher.start() + self.addCleanup(proxy_patcher.stop) def test_get_pointing_directions(self): """Verify can read pointings attribute and length matches without err""" @@ -57,7 +63,7 @@ class TestBeamDevice(base.TestCase): self.assertEqual(DevState.ON, proxy.state()) # verify HBAT_delays method returns the correct dimensions - HBAT_delays = proxy.HBAT_delays([["J2000","0deg","0deg"]] * 96) + HBAT_delays = proxy.HBAT_delays(numpy.array([["J2000","0deg","0deg"]] * 96).flatten()) self.assertEqual((96*16,), HBAT_delays.shape) def test_HBAT_delays_calculations(self): @@ -71,6 +77,6 @@ class TestBeamDevice(base.TestCase): self.assertEqual(DevState.ON, proxy.state()) # verify if values are actually transformed - HBAT_delays = proxy.HBAT_delays([["J2000","0deg","0deg"]] * 96) - HBAT_bf_delays = proxy.calculate_HBAT_bf_delays(HBAT_delays) + HBAT_delays = proxy.HBAT_delays(numpy.array([["J2000","0deg","0deg"]] * 96).flatten()) + HBAT_bf_delays = beam.Beam._calculate_HBAT_bf_delays(HBAT_delays) self.assertNotEqual(HBAT_delays, HBAT_bf_delays)