Skip to content
Snippets Groups Projects
Commit 946101ea authored by Stefano Di Frischia's avatar Stefano Di Frischia
Browse files

Merge branch 'L2SS-480-delays-to-beam-weights' into L2SS-523-delay-calibration-table

parents 33893ac4 74c808ae
No related branches found
No related tags found
1 merge request!227L2SS-523 "Add signal delay calibration table to BEAM"
......@@ -11,7 +11,7 @@ from email.policy import default
import numpy
import datetime
from tango.server import attribute, command, device_property
from tango import AttrWriteType, AttrDataFormat, DebugIt, DevState, DeviceProxy, DevVarStringArray, DevVarFloatArray, DevVarLong64Array, DevVarDoubleArray
from tango import AttrWriteType, DebugIt, DevState, DeviceProxy, DevVarStringArray, DevVarDoubleArray
# Additional import
from tangostationcontrol.common.entrypoint import entry
......@@ -66,8 +66,6 @@ class Beam(lofar_device):
dtype=(numpy.double,), max_dim_x=96,
fget=lambda self: self._hbat_pointing_timestamp)
RECV_name = 'stat/recv/1'
# Directory where the casacore measures that we use, reside. We configure ~/.casarc to
# use the symlink /opt/IERS/current, which we switch to the actual set of files to use.
measures_directory_R = attribute(dtype=str, access=AttrWriteType.READ, fget = lambda self: get_measures_directory())
......@@ -80,13 +78,11 @@ class Beam(lofar_device):
# --------
@log_exceptions()
def configure_for_initialise(self):
# Set a reference of RECV device
try:
self.recv_proxy = DeviceProxy(self.RECV_name)
except:
logger.warning("RECV device offline") # avoid error raising in unittest
super().configure_for_initialise()
# Set a reference of RECV device
self.recv_proxy = DeviceProxy("STAT/RECV/1")
# --------
# internal functions
# --------
......@@ -109,7 +105,8 @@ class Beam(lofar_device):
return delays
def _calculate_HBAT_bf_delays(self, delays: numpy.ndarray):
@staticmethod
def _calculate_HBAT_bf_delays(delays: numpy.ndarray):
"""
Helper function that converts a signal path delay (in seconds) to an analog beam weight,
which is a value per tile per dipole per polarisation.
......@@ -140,7 +137,7 @@ class Beam(lofar_device):
# Record where we now point to, now that we've updated the weights.
# Only the entries within the mask have been updated
mask = self.recv_proxy.RCU_mask_RW
for tile in range(96):
for tile in range(32):
if mask[tile]:
self._hbat_pointing_direction[tile] = pointing_direction[tile]
self._hbat_pointing_timestamp[tile] = timestamp
......@@ -180,7 +177,7 @@ class Beam(lofar_device):
logger.warning("Restarting device to activate new measures tables")
restart_python()
@command(dtype_in=DevVarStringArray, dformat_in=AttrDataFormat.IMAGE, dtype_out=DevVarDoubleArray)
@command(dtype_in=DevVarStringArray, dtype_out=DevVarDoubleArray)
@DebugIt()
@log_exceptions()
@only_in_states([DevState.ON])
......@@ -195,22 +192,6 @@ class Beam(lofar_device):
return delays.flatten()
@command(dtype_in=DevVarFloatArray, dtype_out=DevVarLong64Array)
@DebugIt()
@log_exceptions()
@only_in_states([DevState.ON])
def calculate_HBAT_bf_delays(self, delays: numpy.ndarray):
"""
Helper function that converts a signal path delay (in seconds) to an analog beam weight,
which is a value per tile per dipole per polarisation.
"""
delays = delays.reshape(96,16)
result = self._calculate_HBAT_bf_delays(delays)
return result.flatten()
@command(dtype_in=DevVarStringArray)
@DebugIt()
@only_in_states([DevState.ON])
......
......@@ -34,7 +34,7 @@ class TestDeviceBeam(AbstractTestBases.TestDeviceBase):
self.assertIsNotNone(HBAT_delays_r1)
# Verify writing operation does not lead to errors
self.proxy.HBAT_set_pointing(["J2000", "0deg", "0deg"] * 96) # write values to RECV
self.proxy.HBAT_set_pointing(numpy.array([["J2000","0deg","0deg"]] * 96).flatten()) # write values to RECV
HBAT_delays_r2 = numpy.array(recv_proxy.read_attribute('HBAT_BF_delays_RW').value)
self.assertIsNotNone(HBAT_delays_r2)
......
......@@ -7,13 +7,12 @@
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
import numpy
from tango import DevState, DevFailed
from tango import DevState
from tango.test_context import DeviceTestContext
from tangostationcontrol.devices import beam
import numpy
import mock
from tangostationcontrol.test import base
......@@ -32,6 +31,13 @@ class TestBeamDevice(base.TestCase):
beam.Beam, 'init_device', spec=beam.Beam.init_device)
self.m_init = init_patcher.start()
self.addCleanup(init_patcher.stop)
# Patch DeviceProxy to allow making the proxies during initialisation
# that we otherwise avoid using
proxy_patcher = mock.patch.object(
beam, 'DeviceProxy')
self.m_init = proxy_patcher.start()
self.addCleanup(proxy_patcher.stop)
def test_get_pointing_directions(self):
"""Verify can read pointings attribute and length matches without err"""
......@@ -57,7 +63,7 @@ class TestBeamDevice(base.TestCase):
self.assertEqual(DevState.ON, proxy.state())
# verify HBAT_delays method returns the correct dimensions
HBAT_delays = proxy.HBAT_delays([["J2000","0deg","0deg"]] * 96)
HBAT_delays = proxy.HBAT_delays(numpy.array([["J2000","0deg","0deg"]] * 96).flatten())
self.assertEqual((96*16,), HBAT_delays.shape)
def test_HBAT_delays_calculations(self):
......@@ -71,6 +77,6 @@ class TestBeamDevice(base.TestCase):
self.assertEqual(DevState.ON, proxy.state())
# verify if values are actually transformed
HBAT_delays = proxy.HBAT_delays([["J2000","0deg","0deg"]] * 96)
HBAT_bf_delays = proxy.calculate_HBAT_bf_delays(HBAT_delays)
HBAT_delays = proxy.HBAT_delays(numpy.array([["J2000","0deg","0deg"]] * 96).flatten())
HBAT_bf_delays = beam.Beam._calculate_HBAT_bf_delays(HBAT_delays)
self.assertNotEqual(HBAT_delays, HBAT_bf_delays)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment