Skip to content
Snippets Groups Projects
Commit 7aa4d715 authored by Jan David Mol's avatar Jan David Mol
Browse files

Merge branch 'L2SS-771-speedup-delay-calc' into 'master'

L2SS-771: Speed up delay calculations by using a lot more numpy...

Closes L2SS-771

See merge request !330
parents e1d93b27 aacb6fbd
No related branches found
No related tags found
1 merge request!330L2SS-771: Speed up delay calculations by using a lot more numpy...
......@@ -90,6 +90,12 @@
"DigitalBeam": {
"STAT/DigitalBeam/1": {
"properties": {
"Beam_tracking_interval": [
"1.0"
],
"Beam_tracking_preparation_period": [
"0.5"
]
}
}
}
......
import casacore.measures
from math import sin, cos
import numpy
import datetime
......@@ -29,26 +28,30 @@ class delay_calculator:
result = self.measure.do_frame(frame_time)
assert result == True
def get_direction_vector(self, pointing: dict):
""" Compute a direction vector for the given pointing, relative to the measure. """
angles = self.measure.measure(pointing, "ITRF")
angle0, angle1 = angles["m0"]["value"], angles["m1"]["value"]
def get_direction_vector(self, pointing: numpy.ndarray) -> numpy.ndarray:
""" Compute direction vector for a given pointing, relative to the measure. """
# Convert polar to carthesian coordinates
# see also https://github.com/casacore/casacore/blob/e793b3d5339d828a60339d16476bf688a19df3ec/casa/Quanta/MVDirection.cc#L67
direction_vector = numpy.array(
(cos(angle0) * cos(angle1),
sin(angle0) * cos(angle1),
sin(angle1)))
return self.get_direction_vector_bulk([pointing]).flatten()
def get_direction_vector_bulk(self, pointings: numpy.ndarray) -> numpy.ndarray:
""" Compute direction vectors for the given pointings, relative to the measure. """
return direction_vector
angles0 = numpy.empty(len(pointings))
angles1 = numpy.empty(len(pointings))
for idx, pointing in enumerate(pointings):
angles = self.measure.measure(pointing, "ITRF")
angles0[idx] = angles["m0"]["value"]
angles1[idx] = angles["m1"]["value"]
def _get_delay(self, reference_direction_vector: numpy.array, relative_itrf: numpy.array) -> float:
""" Compute the delay to apply, in seconds, to align signals for a distance of `relative_itrf` in the
direction of `reference_direction_vector`. """
speed_of_light = 299792458.0
# Convert polar to carthesian coordinates
# see also https://github.com/casacore/casacore/blob/e793b3d5339d828a60339d16476bf688a19df3ec/casa/Quanta/MVDirection.cc#L67
direction_vectors = numpy.array(
[numpy.cos(angles0) * numpy.cos(angles1),
numpy.sin(angles0) * numpy.cos(angles1),
numpy.sin(angles1)])
return numpy.dot(reference_direction_vector, relative_itrf) / speed_of_light
# Return array [directions][angles]
return direction_vectors.T
def is_valid_direction(self, direction):
try:
......@@ -58,18 +61,34 @@ class delay_calculator:
return True
def convert(self, direction, antenna_itrf: list([float])):
# explicitly check validity to get a proper error message
if not self.is_valid_direction(direction):
raise ValueError(f"Invalid direction: {direction}")
def convert(self, direction, antenna_absolute_itrf: list([float])):
""" Get the delays for a direction and *absolute* antenna positions.
Returns delays[antenna]. """
# obtain the direction vector for a specific pointing
pointing = self.measure.direction(*direction)
return self.convert_bulk([direction], numpy.array(antenna_absolute_itrf) - self.reference_itrf).flatten()
reference_dir_vector = self.get_direction_vector(pointing)
def convert_bulk(self, directions: numpy.ndarray, antenna_relative_itrfs: numpy.ndarray) -> numpy.ndarray:
""" Get the delays for each direction and each *relative* antenna position.
Returns delays[antenna][direction]. """
# # compute the delays for an antennas w.r.t. the reference position
antenna_relative_itrf = [subtract(pos, self.reference_itrf) for pos in antenna_itrf]
delays = [self._get_delay(reference_dir_vector, relative_itrf) for relative_itrf in antenna_relative_itrf]
# obtain the direction vector for each pointing
try:
pointings = [self.measure.direction(*direction) for direction in directions]
except (RuntimeError, TypeError) as e:
raise ValueError("Invalid direction") from e
direction_vectors = self.get_direction_vector_bulk(pointings)
# compute the corresponding delays for all directions
def get_delay_all_directions(relative_itrf):
# Dot product between relative position and angle vector determines
# distance. Divide by speed of light to obtain delay.
return numpy.inner(relative_itrf, direction_vectors) / 299792458.0
# apply for each position
delays = numpy.apply_along_axis(get_delay_all_directions, 1, antenna_relative_itrfs)
return delays
......@@ -90,8 +90,11 @@ class attribute_wrapper(attribute):
"""
read_func_wrapper reads the attribute value, stores it and returns it"
"""
device.value_dict[self] = self.read_function()
return device.value_dict[self]
try:
device.value_dict[self] = self.read_function()
return device.value_dict[self]
except Exception as e:
raise e.__class__(f"Could not read attribute {comms_annotation}") from e
self.fget = read_func_wrapper
......
......@@ -89,8 +89,8 @@ class DigitalBeam(beam_device):
# a delay calculator
self.delay_calculator = delay_calculator(reference_itrf)
# absolute positions of each antenna element
self.antenna_positions = antenna_itrf
# relatvie positions of each antenna
self.relative_antenna_positions = antenna_itrf - reference_itrf
# use all antennas unless specified otherwise
self._antenna_select = numpy.ones((self.NUM_ANTENNAS, self.NUM_BEAMLETS), dtype=numpy.bool_)
......@@ -112,9 +112,8 @@ class DigitalBeam(beam_device):
d = self.delay_calculator
d.set_measure_time(timestamp)
for beamlet in range(self.NUM_BEAMLETS):
# calculate the delays based on the set reference position, the set time and now the set direction and antenna positions
delays[:, beamlet] = d.convert(pointing_direction[beamlet], self.antenna_positions)
# calculate the delays based on the set reference position, the set time and now the set direction and antenna positions
delays = d.convert_bulk(pointing_direction, self.relative_antenna_positions)
return delays
......
import datetime
import time
import logging
import numpy
import numpy.testing
from tangostationcontrol.beam.delays import delay_calculator
from tangostationcontrol.test import base
class TestDelays(base.TestCase):
def test_init(self):
"""
......@@ -94,7 +95,27 @@ class TestDelays(base.TestCase):
# calculate the delays based on the set reference position, the set time and now the set direction and antenna positions.
delays = d.convert(direction, antenna_itrf)
self.assertListEqual(delays, [0.0], msg=f"delays = {delays}")
self.assertListEqual(delays.tolist(), [0.0], msg=f"delays = {delays}")
def test_regression(self):
reference_itrf = [3826577.066, 461022.948, 5064892.786] # CS002LBA, in ITRF2005 epoch 2012.5
d = delay_calculator(reference_itrf)
# set the antenna position identical to the reference position
antenna_itrf = [[3826923.503, 460915.488, 5064643.517]] # CS001LBA, in ITRF2005 epoch 2012.5
# # set the timestamp to solve for
timestamp = datetime.datetime(2000, 1, 1, 0, 0, 0)
d.set_measure_time(timestamp)
# # obtain the direction vector for a specific pointing
direction = "J2000", "0deg", "90deg"
# calculate the delays based on the set reference position, the set time and now the set direction and antenna positions.
delays = d.convert(direction, antenna_itrf)
# check for regression
self.assertAlmostEqual(-8.31467564781444e-07, delays[0], delta=1.0e-10)
def test_light_second_delay(self):
"""
......@@ -123,3 +144,39 @@ class TestDelays(base.TestCase):
self.assertAlmostEqual(0.1, delays[0], 6, f"delays[0] = {delays[0]}")
def test_convert_bulk(self):
d = delay_calculator([0, 0, 0])
timestamp = datetime.datetime(2022, 3, 1, 0, 0, 0) # timestamp does not actually matter, but casacore doesn't know that.
d.set_measure_time(timestamp)
# generate different positions and directions
positions = numpy.array([[i,2,3] for i in range(5)])
directions = numpy.array([["J2000", f"{i}deg", f"{i}deg"] for i in range(90)])
bulk_result = d.convert_bulk(directions, positions)
# verify parallellisation along direction axis
for i, single_dir in enumerate(directions):
single_dir_result = d.convert_bulk([single_dir], positions)
numpy.testing.assert_equal(single_dir_result[:, 0], bulk_result[:, i])
# verify parallellisation along position axis
for i, single_pos in enumerate(positions):
single_pos_result = d.convert_bulk(directions, [single_pos])
numpy.testing.assert_equal(single_pos_result[0, :], bulk_result[i, :])
def test_convert_bulk_speed(self):
d = delay_calculator([0, 0, 0])
timestamp = datetime.datetime(2022, 3, 1, 0, 0, 0) # timestamp does not actually matter, but casacore doesn't know that.
d.set_measure_time(timestamp)
positions = numpy.array([[1,2,3]] * 96)
directions = numpy.array([["J2000", "0deg", "0deg"]] * 488)
count = 10
before = time.monotonic_ns()
for _ in range(count):
_ = d.convert_bulk(directions, positions)
after = time.monotonic_ns()
logging.error(f"convert_bulk averaged {(after - before)/count/1e6} ms to convert 488 directions for 96 antennas.")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment