Skip to content
Snippets Groups Projects
Commit d57ef26c authored by Corné Lukken's avatar Corné Lukken
Browse files

Merge branch 'L2SS-572-refactor-delays' into 'master'

Resolve L2SS-572 "Refactor delays"

Closes L2SS-572

See merge request !396
parents 036801d0 70898457
Branches
Tags
1 merge request!396Resolve L2SS-572 "Refactor delays"
......@@ -2,11 +2,11 @@ import casacore.measures
import numpy
import datetime
def subtract(a, b):
def subtract(a, b) -> numpy.ndarray:
return numpy.array([x - y for x, y in zip(a, b)])
class delay_calculator:
class Delays:
def __init__(self, itrf: list([float])):
""" Create a measure object, configured for the specified terrestrial location. """
......@@ -14,8 +14,8 @@ class delay_calculator:
measure = casacore.measures.measures()
frame_location = measure.position("ITRF", *[f"{x}m" for x in itrf])
result = measure.do_frame(frame_location)
assert result == True
if not measure.do_frame(frame_location):
raise ValueError(f"measure.do_frame failed for ITRF location {itrf}")
self.reference_itrf = itrf
self.measure = measure
......@@ -23,10 +23,11 @@ class delay_calculator:
def set_measure_time(self, utc_time: datetime.datetime):
""" Configure the measure object for the specified time. """
frame_time = self.measure.epoch("UTC", utc_time.isoformat(' '))
utc_time_str = utc_time.isoformat(' ')
frame_time = self.measure.epoch("UTC", utc_time_str)
result = self.measure.do_frame(frame_time)
assert result == True
if not self.measure.do_frame(frame_time):
raise ValueError(f"measure.do_frame failed for UTC time {utc_time_str}")
def get_direction_vector(self, pointing: numpy.ndarray) -> numpy.ndarray:
""" Compute direction vector for a given pointing, relative to the measure. """
......@@ -53,7 +54,8 @@ class delay_calculator:
# Return array [directions][angles]
return direction_vectors.T
def is_valid_direction(self, direction):
def is_valid_direction(self, direction) -> bool:
""" Check validity of the direction measure """
try:
_ = self.measure.direction(*direction)
except (RuntimeError, TypeError) as e:
......@@ -61,14 +63,14 @@ class delay_calculator:
return True
def convert(self, direction, antenna_absolute_itrf: list([float])):
def delays(self, direction, antenna_absolute_itrf: list([float])) -> numpy.ndarray:
""" Get the delays for a direction and *absolute* antenna positions.
Returns delays[antenna]. """
return self.convert_bulk([direction], numpy.array(antenna_absolute_itrf) - self.reference_itrf).flatten()
return self.delays_bulk([direction], numpy.array(antenna_absolute_itrf) - self.reference_itrf).flatten()
def convert_bulk(self, directions: numpy.ndarray, antenna_relative_itrfs: numpy.ndarray) -> numpy.ndarray:
def delays_bulk(self, directions: numpy.ndarray, antenna_relative_itrfs: numpy.ndarray) -> numpy.ndarray:
""" Get the delays for each direction and each *relative* antenna position.
Returns delays[antenna][direction]. """
......
......@@ -24,7 +24,7 @@ from tangostationcontrol.common.measures import get_measures_directory, get_avai
from tangostationcontrol.common.lofar_logging import log_exceptions
from tangostationcontrol.common.states import DEFAULT_COMMAND_STATES
from tangostationcontrol.devices.device_decorators import TimeIt, only_in_states, fault_on_error
from tangostationcontrol.beam.delays import delay_calculator
from tangostationcontrol.beam.delays import Delays
from tangostationcontrol.devices.lofar_device import lofar_device
__all__ = ["beam_device", "main", "BeamTracker"]
......@@ -219,7 +219,7 @@ class beam_device(lofar_device):
self.Beam_tracker = None
# generic delay calculator to ask about validity of settings
self.generic_delay_calculator = delay_calculator([0, 0, 0])
self.generic_delay_calculator = Delays([0, 0, 0])
# Derived classes will override this with a non-parameterised
# version that lofar_device will call.
......
......@@ -17,7 +17,7 @@ from tangostationcontrol.devices.beam_device import beam_device
from tangostationcontrol.devices.sdp.beamlet import Beamlet
from tangostationcontrol.devices.device_decorators import TimeIt
from tangostationcontrol.common.lofar_logging import log_exceptions
from tangostationcontrol.beam.delays import delay_calculator
from tangostationcontrol.beam.delays import Delays
import numpy
import datetime
......@@ -137,7 +137,7 @@ class DigitalBeam(beam_device):
input_itrf[input_nr] = antenna_itrf[antenna_nr]
# a delay calculator
self.delay_calculator = delay_calculator(reference_itrf)
self.delay_calculator = Delays(reference_itrf)
# relative positions of each antenna
self.relative_input_positions = input_itrf - reference_itrf
......@@ -165,7 +165,7 @@ class DigitalBeam(beam_device):
d.set_measure_time(timestamp)
# calculate the delays based on the set reference position, the set time and now the set direction and antenna positions
delays = d.convert_bulk(pointing_direction, self.relative_input_positions)
delays = d.delays_bulk(pointing_direction, self.relative_input_positions)
return delays
......
......@@ -16,7 +16,7 @@ from tango import Util
# Additional import
from tangostationcontrol.common.entrypoint import entry
from tangostationcontrol.common.lofar_logging import device_logging_to_python, log_exceptions
from tangostationcontrol.beam.delays import delay_calculator
from tangostationcontrol.beam.delays import Delays
from tangostationcontrol.beam.hba_tile import NUMBER_OF_ELEMENTS_PER_TILE
from tangostationcontrol.devices.beam_device import beam_device
from tangostationcontrol.devices.device_decorators import TimeIt
......@@ -61,7 +61,7 @@ class TileBeam(beam_device):
HBAT_antenna_itrf_offsets = self.antennafield_proxy.HBAT_antenna_itrf_offsets_R.reshape(self._nr_tiles, NUMBER_OF_ELEMENTS_PER_TILE, 3)
# a delay calculator for each tile
self.HBAT_delay_calculators = [delay_calculator(reference_itrf) for reference_itrf in HBAT_reference_itrf]
self.HBAT_delay_calculators = [Delays(reference_itrf) for reference_itrf in HBAT_reference_itrf]
# absolute positions of each antenna element
self.HBAT_antenna_positions = [HBAT_reference_itrf[tile] + HBAT_antenna_itrf_offsets[tile] for tile in range(self._nr_tiles)]
......@@ -85,7 +85,7 @@ class TileBeam(beam_device):
d.set_measure_time(timestamp)
# calculate the delays based on the set reference position, the set time and now the set direction and antenna positions
delays[tile] = d.convert(pointing_direction[tile], self.HBAT_antenna_positions[tile])
delays[tile] = d.delays(pointing_direction[tile], self.HBAT_antenna_positions[tile])
return delays
......
import datetime
import time
import logging
import mock
import numpy
import numpy.testing
import casacore
from tangostationcontrol.beam.delays import delay_calculator
from tangostationcontrol.beam.delays import Delays
from tangostationcontrol.test import base
class TestDelays(base.TestCase):
def test_init(self):
"""
Fail condition is simply the object creation failing
"""
"""Fail condition is simply the object creation failing"""
reference_itrf = [3826577.066, 461022.948, 5064892.786] # CS002LBA, in ITRF2005 epoch 2012.5
d = delay_calculator(reference_itrf)
d = Delays(reference_itrf)
self.assertIsNotNone(d)
def test_init_fails(self):
"""Test do_measure returning false is correctly caught"""
with mock.patch.object(casacore.measures, "measures") as m_measure:
m_measure.return_value.do_frame.return_value = False
self.assertRaises(ValueError, Delays, [0, 0, 0])
def test_is_valid_direction(self):
d = delay_calculator([0, 0, 0])
d = Delays([0, 0, 0])
# should accept base use cases
self.assertTrue(d.is_valid_direction(("J2000", "0deg", "0deg")))
......@@ -45,7 +54,7 @@ class TestDelays(base.TestCase):
def test_sun(self):
# # create a frame tied to the reference position
reference_itrf = [3826577.066, 461022.948, 5064892.786]
d = delay_calculator(reference_itrf)
d = Delays(reference_itrf)
for i in range(24):
......@@ -78,7 +87,7 @@ class TestDelays(base.TestCase):
def test_identical_location(self):
# # create a frame tied to the reference position
reference_itrf = [3826577.066, 461022.948, 5064892.786] # CS002LBA, in ITRF2005 epoch 2012.5
d = delay_calculator(reference_itrf)
d = Delays(reference_itrf)
# set the antenna position identical to the reference position
antenna_itrf = [[reference_itrf[0], reference_itrf[1], reference_itrf[2]]] # CS001LBA, in ITRF2005 epoch 2012.5
......@@ -93,13 +102,13 @@ class TestDelays(base.TestCase):
direction = "J2000", "0deg", "0deg"
# calculate the delays based on the set reference position, the set time and now the set direction and antenna positions.
delays = d.convert(direction, antenna_itrf)
delays = d.delays(direction, antenna_itrf)
self.assertListEqual(delays.tolist(), [0.0], msg=f"delays = {delays}")
def test_regression(self):
reference_itrf = [3826577.066, 461022.948, 5064892.786] # CS002LBA, in ITRF2005 epoch 2012.5
d = delay_calculator(reference_itrf)
d = Delays(reference_itrf)
# set the antenna position identical to the reference position
antenna_itrf = [[3826923.503, 460915.488, 5064643.517]] # CS001LBA, in ITRF2005 epoch 2012.5
......@@ -112,7 +121,7 @@ class TestDelays(base.TestCase):
direction = "J2000", "0deg", "90deg"
# calculate the delays based on the set reference position, the set time and now the set direction and antenna positions.
delays = d.convert(direction, antenna_itrf)
delays = d.delays(direction, antenna_itrf)
# check for regression
self.assertAlmostEqual(-8.31467564781444e-07, delays[0], delta=1.0e-10)
......@@ -123,7 +132,7 @@ class TestDelays(base.TestCase):
"""
reference_itrf = [0, 0, 0]
d = delay_calculator(reference_itrf)
d = Delays(reference_itrf)
# set the antenna position 0.1 lightsecond in the Z direction of the ITRF,
# which is aligned with the North Pole, see
......@@ -140,12 +149,12 @@ class TestDelays(base.TestCase):
direction = "J2000", "0deg", "90deg"
# calculate the delays based on the set reference position, the set time and now the set direction and antenna positions.
delays = d.convert(direction, antenna_itrf)
delays = d.delays(direction, antenna_itrf)
self.assertAlmostEqual(0.1, delays[0], 6, f"delays[0] = {delays[0]}")
def test_convert_bulk(self):
d = delay_calculator([0, 0, 0])
def test_delays_bulk(self):
d = Delays([0, 0, 0])
timestamp = datetime.datetime(2022, 3, 1, 0, 0, 0) # timestamp does not actually matter, but casacore doesn't know that.
d.set_measure_time(timestamp)
......@@ -153,20 +162,20 @@ class TestDelays(base.TestCase):
positions = numpy.array([[i,2,3] for i in range(5)])
directions = numpy.array([["J2000", f"{i}deg", f"{i}deg"] for i in range(90)])
bulk_result = d.convert_bulk(directions, positions)
bulk_result = d.delays_bulk(directions, positions)
# verify parallellisation along direction axis
for i, single_dir in enumerate(directions):
single_dir_result = d.convert_bulk([single_dir], positions)
single_dir_result = d.delays_bulk([single_dir], positions)
numpy.testing.assert_almost_equal(single_dir_result[:, 0], bulk_result[:, i], 4)
# verify parallellisation along position axis
for i, single_pos in enumerate(positions):
single_pos_result = d.convert_bulk(directions, [single_pos])
single_pos_result = d.delays_bulk(directions, [single_pos])
numpy.testing.assert_almost_equal(single_pos_result[0, :], bulk_result[i, :], 4)
def test_convert_bulk_speed(self):
d = delay_calculator([0, 0, 0])
def test_delays_bulk_speed(self):
d = Delays([0, 0, 0])
timestamp = datetime.datetime(2022, 3, 1, 0, 0, 0) # timestamp does not actually matter, but casacore doesn't know that.
d.set_measure_time(timestamp)
......@@ -176,6 +185,6 @@ class TestDelays(base.TestCase):
count = 10
before = time.monotonic_ns()
for _ in range(count):
_ = d.convert_bulk(directions, positions)
_ = d.delays_bulk(directions, positions)
after = time.monotonic_ns()
logging.error(f"convert_bulk averaged {(after - before)/count/1e6} ms to convert 488 directions for 96 antennas.")
logging.error(f"delays bulk averaged {(after - before)/count/1e6} ms to convert 488 directions for 96 antennas.")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment