Skip to content
Snippets Groups Projects
Commit 28b83851 authored by Jan David Mol's avatar Jan David Mol
Browse files

Merge branch 'L2SS-480-delays-to-beam-weights' into 'master'

Resolve L2SS-480 "Delays to beam weights"

Closes L2SS-480

See merge request !220
parents a16241d3 cd6279d5
No related branches found
No related tags found
1 merge request!220Resolve L2SS-480 "Delays to beam weights"
image: git.astron.nl:5000/lofar2.0/tango/tango-itango:9.3.7
image: git.astron.nl:5000/lofar2.0/tango/lofar-device-base:latest
variables:
GIT_SUBMODULE_STRATEGY: recursive
PIP_CACHE_DIR: "$CI_PROJECT_DIR/.cache/pip"
......
......@@ -8,18 +8,17 @@
"""
import numpy
import datetime
from tango.server import attribute, command, device_property
from tango import AttrWriteType, DebugIt, DevState, DeviceProxy, DevVarStringArray, DevVarDoubleArray
from tango.server import attribute
from tango.server import command
from tango import AttrWriteType
from tango import DevState
from tango import DebugIt
# Additional import
from tangostationcontrol.common.entrypoint import entry
from tangostationcontrol.devices.device_decorators import only_in_states
from tangostationcontrol.devices.lofar_device import lofar_device
from tangostationcontrol.common.lofar_logging import device_logging_to_python, log_exceptions
from tangostationcontrol.common.measures import get_measures_directory, use_measures_directory, download_measures, restart_python, get_available_measures_directories
from tangostationcontrol.beam.delays import delay_calculator
from tangostationcontrol.devices.device_decorators import *
import logging
logger = logging.getLogger()
......@@ -30,32 +29,35 @@ __all__ = ["Beam", "main"]
@device_logging_to_python()
class Beam(lofar_device):
_hbat_pointing_direction = numpy.zeros(96)
_hbat_pointing_epoch = numpy.zeros(96)
@property
def hbat_pointing_direction(self):
return tuple(self._hbat_pointing_direction)
@property
def hbat_pointing_epoch(self):
return tuple(self._hbat_pointing_epoch)
_hbat_pointing_direction = numpy.zeros((96,3), dtype=numpy.str)
_hbat_pointing_timestamp = numpy.zeros(96, dtype=numpy.double)
# -----------------
# Device Properties
# -----------------
reference_itrf = device_property(
dtype='DevVarFloatArray',
mandatory=False,
default_value = numpy.tile(numpy.array([3826577.066, 461022.948, 5064892.786]),(96,1)) # CS002LBA, in ITRF2005 timestamp 2012.5
)
antenna_itrf = device_property(
dtype='DevVarFloatArray',
mandatory=False,
default_value = numpy.tile(numpy.array([3826923.546, 460915.441, 5064643.489]),(96,16,1)) # CS001LBA, in ITRF2005 timestamp 2012.5
)
# ----------
# Attributes
# ----------
HBAT_pointing_direction_R = attribute(access=AttrWriteType.READ,
dtype=(numpy.double,), max_dim_x=96,
fget=lambda self: self.hbat_pointing_direction)
dtype=((numpy.str,),), max_dim_x=3, max_dim_y=96,
fget=lambda self: self._hbat_pointing_direction)
HBAT_pointing_epoch_R = attribute(access=AttrWriteType.READ,
HBAT_pointing_timestamp_R = attribute(access=AttrWriteType.READ,
dtype=(numpy.double,), max_dim_x=96,
fget=lambda self: self.hbat_pointing_epoch)
fget=lambda self: self._hbat_pointing_timestamp)
# Directory where the casacore measures that we use, reside. We configure ~/.casarc to
# use the symlink /opt/IERS/current, which we switch to the actual set of files to use.
......@@ -67,32 +69,72 @@ class Beam(lofar_device):
# --------
# overloaded functions
# --------
@log_exceptions()
def configure_for_initialise(self):
super().configure_for_initialise()
# Set a reference of RECV device
self.recv_proxy = DeviceProxy("STAT/RECV/1")
# --------
# Commands
# internal functions
# --------
@DebugIt()
@command(dtype_in=(numpy.double,), dtype_out=int)
@only_in_states([DevState.STANDBY, DevState.ON])
def set_direction_pointings(self, new_points: numpy.array):
if new_points.size != 96:
return -1
def _HBAT_delays(self, pointing_direction: numpy.array, timestamp: datetime.datetime = datetime.datetime.now()):
"""
Calculate the delays (in seconds) based on the pointing list and the timestamp
TBD: antenna and reference positions will be retrieved from RECV and not stored as BEAM device properties
"""
self._hbat_pointing_direction = new_points
delays = numpy.zeros((96,16), dtype=numpy.float64)
return 0
for tile in range(96):
# initialise delay calculator
d = delay_calculator(self.reference_itrf[tile])
d.set_measure_time(timestamp)
@DebugIt()
@command(dtype_in=(numpy.double,), dtype_out=int)
@only_in_states([DevState.STANDBY, DevState.ON])
def set_direction_epochs(self, new_points: numpy.array):
if new_points.size != 96:
return -1
# calculate the delays based on the set reference position, the set time and now the set direction and antenna positions
delays[tile] = d.convert(pointing_direction[tile], self.antenna_itrf[tile])
return delays
@staticmethod
def _calculate_HBAT_bf_delays(delays: numpy.ndarray):
"""
Helper function that converts a signal path delay (in seconds) to an analog beam weight,
which is a value per tile per dipole per polarisation.
"""
# Duplicate delay values per polarisation
polarised_delays = numpy.tile(delays, 2) # output dims -> 96x32
# Divide by 0.5ns and round
HBAT_bf_delays = numpy.array(polarised_delays / 0.5e-09, dtype=numpy.int64)
return HBAT_bf_delays
self._hbat_pointing_epoch = new_points
def _HBAT_set_pointing(self, pointing_direction: numpy.array, timestamp: datetime.datetime = datetime.datetime.now()):
"""
Uploads beam weights based on a given pointing direction 2D array (96 tiles x 3 parameters)
"""
# Retrieve delays from casacore
delays = self._HBAT_delays(pointing_direction, timestamp)
# Convert delays into beam weights
HBAT_bf_delays = self._calculate_HBAT_bf_delays(delays)
# Write weights to RECV
self.recv_proxy.HBAT_BF_delays_RW = HBAT_bf_delays
# Record where we now point to, now that we've updated the weights.
# Only the entries within the mask have been updated
mask = self.recv_proxy.Ant_mask_RW.flatten()
for rcu in range(96):
if mask[rcu]:
self._hbat_pointing_direction[rcu] = pointing_direction[rcu]
self._hbat_pointing_timestamp[rcu] = timestamp
return 0
# --------
# Commands
# --------
@command(dtype_out=str, doc_out="Name of newly installed measures directory")
@DebugIt()
......@@ -125,6 +167,32 @@ class Beam(lofar_device):
logger.warning("Restarting device to activate new measures tables")
restart_python()
@command(dtype_in=DevVarStringArray, dtype_out=DevVarDoubleArray)
@DebugIt()
@log_exceptions()
@only_in_states([DevState.ON])
def HBAT_delays(self, pointing_direction: numpy.array, timestamp: datetime.datetime = datetime.datetime.now()):
"""
Calculate the delays (in seconds) based on the pointing list and the timestamp
TBD: antenna and reference positions will be retrieved from RECV and not stored as BEAM device properties
"""
pointing_direction = numpy.array(pointing_direction).reshape(96,3)
delays = self._HBAT_delays(pointing_direction, timestamp)
return delays.flatten()
@command(dtype_in=DevVarStringArray)
@DebugIt()
@only_in_states([DevState.ON])
def HBAT_set_pointing(self, pointing_direction: list, timestamp: datetime.datetime = datetime.datetime.now()):
"""
Uploads beam weights based on a given pointing direction 2D array (96 tiles x 3 parameters)
"""
pointing_direction = numpy.array(pointing_direction).reshape(96,3)
self._HBAT_set_pointing(pointing_direction, timestamp)
# ----------
# Run server
# ----------
......
......@@ -7,9 +7,36 @@
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
import numpy
from tangostationcontrol.integration_test.device_proxy import TestDeviceProxy
from .base import AbstractTestBases
class TestDeviceBeam(AbstractTestBases.TestDeviceBase):
def setUp(self):
super().setUp("STAT/Beam/1")
def test_write_HBAT_delays(self):
""" Test whether the delay values are correctly saved into the relative RECV attribute"""
self.proxy.initialise()
self.proxy.on()
# setup RECV as well
recv_proxy = TestDeviceProxy("STAT/RECV/1")
recv_proxy.off()
recv_proxy.initialise()
recv_proxy.on()
# Verify attribute is present (all zeros if never used before)
HBAT_delays_r1 = numpy.array(recv_proxy.read_attribute('HBAT_BF_delays_RW').value)
self.assertIsNotNone(HBAT_delays_r1)
# Verify writing operation does not lead to errors
self.proxy.HBAT_set_pointing(numpy.array([["J2000","0deg","0deg"]] * 96).flatten()) # write values to RECV
HBAT_delays_r2 = numpy.array(recv_proxy.read_attribute('HBAT_BF_delays_RW').value)
self.assertIsNotNone(HBAT_delays_r2)
# Verify delays changed (to be discussed)
#self.assertFalse((HBAT_delays_r1==HBAT_delays_r2).all())
......@@ -7,13 +7,12 @@
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
import numpy
from tango import DevState
from tango.test_context import DeviceTestContext
from tangostationcontrol.devices import beam
from tangostationcontrol.devices import beam, lofar_device
import numpy
import mock
from tangostationcontrol.test import base
......@@ -24,14 +23,13 @@ class TestBeamDevice(base.TestCase):
def setUp(self):
super(TestBeamDevice, self).setUp()
# lofar_device init_device will launch a DeviceProxy not captured by
# the TestDeviceContext making it fail.
# Patch init_device and force match spec
init_patcher = mock.patch.object(
beam.Beam, 'init_device', spec=beam.Beam.init_device)
self.m_init = init_patcher.start()
self.addCleanup(init_patcher.stop)
# Patch DeviceProxy to allow making the proxies during initialisation
# that we otherwise avoid using
for device in [beam, lofar_device]:
proxy_patcher = mock.patch.object(
device, 'DeviceProxy')
proxy_patcher.start()
self.addCleanup(proxy_patcher.stop)
def test_get_pointing_directions(self):
"""Verify can read pointings attribute and length matches without err"""
......@@ -39,81 +37,38 @@ class TestBeamDevice(base.TestCase):
self.assertEqual(96, len(proxy.read_attribute(
"HBAT_pointing_direction_R").value))
def test_get_pointing_epochs(self):
"""Verify can read epochs attribute and length matches without err"""
def test_get_pointing_timestamps(self):
"""Verify can read timestamps attribute and length matches without err"""
with DeviceTestContext(beam.Beam, process=True) as proxy:
self.assertEqual(96, len(proxy.read_attribute(
"HBAT_pointing_epoch_R").value))
def test_set_pointing_direction(self):
"""Verify can set pointings attribute without error"""
with DeviceTestContext(beam.Beam, process=True) as proxy:
proxy.init()
proxy.Initialise()
self.assertEqual(DevState.STANDBY, proxy.state())
self.assertEqual(0, proxy.set_direction_pointings(numpy.zeros(96)))
def test_set_pointing_epochs(self):
"""Verify can set epochs attribute without error"""
with DeviceTestContext(beam.Beam, process=True) as proxy:
proxy.init()
proxy.Initialise()
self.assertEqual(DevState.STANDBY, proxy.state())
self.assertEqual(0, proxy.set_direction_epochs(numpy.zeros(96)))
def pointing(self, attribute: str, lambd):
data = numpy.arange(0, 96)
with DeviceTestContext(beam.Beam, process=True) as proxy:
proxy.init()
proxy.Initialise()
self.assertEqual(DevState.STANDBY, proxy.state())
# Evaluate default all zeros are present using numpy array compare
compare_obj = numpy.zeros(96) == proxy.read_attribute(
attribute).value
self.assertTrue(compare_obj.all())
# Set direction pointings to range of incrementing values
self.assertEqual(0, lambd(proxy, data))
# Verify attribute has been updated with correct data
compare_obj = data == proxy.read_attribute(attribute).value
self.assertTrue(compare_obj.all())
def test_direction_pointing(self):
"""Set and Get test with actual values for pointing attribute"""
self.pointing("HBAT_pointing_direction_R", lambda x, y:
x.set_direction_pointings(y))
def test_direction_epochs(self):
"""Set and Get test with actual values for pointing attribute"""
self.pointing("HBAT_pointing_epoch_R", lambda x, y:
x.set_direction_epochs(y))
def test_pointing_invalid(self):
"""Test that set pointings command refuses invalid lengths"""
"HBAT_pointing_timestamp_R").value))
def test_HBAT_delays_dims(self):
"""Verify HBAT delays are retrieved with correct dimensions"""
with DeviceTestContext(beam.Beam, process=True) as proxy:
proxy.init()
proxy.Initialise()
self.assertEqual(DevState.STANDBY, proxy.state())
proxy.set_defaults()
proxy.on()
self.assertEqual(DevState.ON, proxy.state())
# should return error due to invalid length
self.assertEqual(-1, proxy.set_direction_pointings(numpy.zeros(55)))
def test_epoch_invalid(self):
"""Test that set epochs command refuses invalid lengths"""
# verify HBAT_delays method returns the correct dimensions
HBAT_delays = proxy.HBAT_delays(numpy.array([["J2000","0deg","0deg"]] * 96).flatten())
self.assertEqual((96*16,), HBAT_delays.shape)
def test_HBAT_delays_calculations(self):
"""Verify the calculations from delays to weights"""
with DeviceTestContext(beam.Beam, process=True) as proxy:
proxy.init()
proxy.Initialise()
self.assertEqual(DevState.STANDBY, proxy.state())
# should return error due to invalid length
self.assertEqual(-1, proxy.set_direction_epochs(numpy.zeros(55)))
proxy.set_defaults()
proxy.on()
self.assertEqual(DevState.ON, proxy.state())
# verify if values are actually transformed
HBAT_delays = proxy.HBAT_delays(numpy.array([["J2000","0deg","0deg"]] * 96).flatten())
HBAT_bf_delays = beam.Beam._calculate_HBAT_bf_delays(HBAT_delays)
self.assertNotEqual(HBAT_delays, HBAT_bf_delays)
......@@ -7,6 +7,7 @@ skipsdist = True
usedevelop = True
sitepackages = True
install_command = pip3 install {opts} {packages}
passenv = HOME
setenv =
VIRTUAL_ENV={envdir}
PYTHONWARNINGS=default::DeprecationWarning
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment