Skip to content
Snippets Groups Projects
Commit 1c2a6154 authored by Jan David Mol's avatar Jan David Mol
Browse files

Merge branch 'improve-pointing-representation' into 'master'

L2SS-1501: Improve string representation of pointings, and introduced a neutral default pointing 'None'

See merge request !726
parents 1db6aa56 158107f0
No related branches found
No related tags found
1 merge request!726L2SS-1501: Improve string representation of pointings, and introduced a neutral default pointing 'None'
...@@ -2,17 +2,71 @@ ...@@ -2,17 +2,71 @@
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
import datetime import datetime
from functools import lru_cache
from typing import TypedDict
import casacore.measures import casacore.measures
import numpy import numpy
class CasacoreQuantum(TypedDict):
"""A casacore::Quantum object as returned by casacore.measures."""
unit: str
value: float
class CasacoreMDirection(TypedDict):
"""A casacore::MDirection object as returned by casacore.measures."""
type: str
refer: str
m1: CasacoreQuantum
m2: CasacoreQuantum
@lru_cache
def pointing_to_str(pointing: tuple[str, str, str]) -> str:
"""Convert a pointing tuple (direction_type, angle0, angle1) into a coincise string to display."""
direction_type, angle0, angle1 = pointing
def display_angle(angle: str) -> str:
"""Cut off lengthy precision, if possible."""
if angle.endswith("rad"):
# Radians, round to 5 digits
try:
angle_amount = float(angle[:-3])
except ValueError:
return angle
return "{0:.5}rad".format(angle_amount)
if angle.endswith("deg"):
# Degrees, round to 3 digits
try:
angle_amount = float(angle[:-3])
except ValueError:
return angle
return "{0:.3}deg".format(angle_amount)
return angle
return "{direction_type} ({angle0}, {angle1})".format(
direction_type=direction_type,
angle0=display_angle(angle0),
angle1=display_angle(angle1),
)
def subtract(a, b) -> numpy.ndarray: def subtract(a, b) -> numpy.ndarray:
return numpy.array([x - y for x, y in zip(a, b)]) return numpy.array([x - y for x, y in zip(a, b)])
class Delays: class Delays:
def __init__(self, itrf: list[[float]]): def __init__(self, itrf: tuple[float, float, float]):
"""Create a measure object, configured for the specified terrestrial location.""" """Create a measure object, configured for the specified terrestrial location."""
measure = casacore.measures.measures() measure = casacore.measures.measures()
...@@ -33,18 +87,25 @@ class Delays: ...@@ -33,18 +87,25 @@ class Delays:
if not self.measure.do_frame(frame_time): if not self.measure.do_frame(frame_time):
raise ValueError(f"measure.do_frame failed for UTC time {utc_time_str}") raise ValueError(f"measure.do_frame failed for UTC time {utc_time_str}")
def get_direction_vector(self, pointing: numpy.ndarray) -> numpy.ndarray: def get_direction_vector(self, pointing: list[str]) -> numpy.ndarray:
"""Compute direction vector for a given pointing, relative to the measure.""" """Compute direction vector for a given pointing, relative to the measure."""
return self.get_direction_vector_bulk([pointing]).flatten() return self.get_direction_vector_bulk([pointing]).flatten()
def get_direction_vector_bulk(self, pointings: numpy.ndarray) -> numpy.ndarray: def get_direction_vector_bulk(self, pointings: list[list[str]]) -> numpy.ndarray:
"""Compute direction vectors for the given pointings, relative to the measure.""" """Compute direction vectors for the given pointings, relative to the measure."""
angles0 = numpy.empty(len(pointings)) angles0 = numpy.empty(len(pointings))
angles1 = numpy.empty(len(pointings)) angles1 = numpy.empty(len(pointings))
for idx, pointing in enumerate(pointings): for idx, pointing in enumerate(pointings):
angles = self.measure.measure(pointing, "ITRF") direction: CasacoreMDirection | None = self._pointing_to_direction(pointing)
if direction is None:
# uninitialised pointings
angles0[idx] = 0
angles1[idx] = 0
else:
angles = self.measure.measure(direction, "ITRF")
angles0[idx] = angles["m0"]["value"] angles0[idx] = angles["m0"]["value"]
angles1[idx] = angles["m1"]["value"] angles1[idx] = angles["m1"]["value"]
...@@ -61,16 +122,32 @@ class Delays: ...@@ -61,16 +122,32 @@ class Delays:
# Return array [directions][angles] # Return array [directions][angles]
return direction_vectors.T return direction_vectors.T
def is_valid_direction(self, direction) -> bool: def _pointing_to_direction(
self, pointing: tuple[str, str, str]
) -> CasacoreMDirection | None:
try:
if pointing[0] == "None":
# uninitialised direction
return None
return self.measure.direction(*pointing)
except (RuntimeError, TypeError, KeyError, IndexError) as e:
raise ValueError(f"Invalid pointing: {pointing}") from e
def is_valid_pointing(self, pointing: tuple[str, str, str]) -> bool:
"""Check validity of the direction measure""" """Check validity of the direction measure"""
try: try:
_ = self.measure.direction(*direction) _ = self._pointing_to_direction(pointing)
except (RuntimeError, TypeError) as e: except ValueError as e:
return False return False
return True return True
def delays(self, direction, antenna_absolute_itrf: list[[float]]) -> numpy.ndarray: def delays(
self,
pointing: list[str],
antenna_absolute_itrf: list[tuple[float, float, float]],
) -> numpy.ndarray:
"""Get the delays for a direction and *absolute* antenna positions. """Get the delays for a direction and *absolute* antenna positions.
These are the delays that have to be applied to the signal chain in order to line up the signal. These are the delays that have to be applied to the signal chain in order to line up the signal.
...@@ -79,12 +156,12 @@ class Delays: ...@@ -79,12 +156,12 @@ class Delays:
Returns delays[antenna].""" Returns delays[antenna]."""
return self.delays_bulk( return self.delays_bulk(
numpy.array([direction]), numpy.array([pointing]),
numpy.array(antenna_absolute_itrf) - self.reference_itrf, numpy.array(antenna_absolute_itrf) - self.reference_itrf,
).flatten() ).flatten()
def delays_bulk( def delays_bulk(
self, directions: numpy.ndarray, antenna_relative_itrfs: numpy.ndarray self, pointings: numpy.ndarray, antenna_relative_itrfs: numpy.ndarray
) -> numpy.ndarray: ) -> numpy.ndarray:
"""Get the delays for each direction and each *relative* antenna position. """Get the delays for each direction and each *relative* antenna position.
...@@ -94,11 +171,6 @@ class Delays: ...@@ -94,11 +171,6 @@ class Delays:
Returns delays[antenna][direction].""" Returns delays[antenna][direction]."""
# obtain the direction vector for each pointing # obtain the direction vector for each pointing
try:
pointings = [self.measure.direction(*direction) for direction in directions]
except (RuntimeError, TypeError) as e:
raise ValueError("Invalid direction") from e
direction_vectors = self.get_direction_vector_bulk(pointings) direction_vectors = self.get_direction_vector_bulk(pointings)
# compute the corresponding delays for all directions # compute the corresponding delays for all directions
......
...@@ -26,7 +26,7 @@ from tango import ( ...@@ -26,7 +26,7 @@ from tango import (
# PyTango imports # PyTango imports
from tango.server import attribute, command, device_property from tango.server import attribute, command, device_property
from tangostationcontrol.beam.delays import Delays from tangostationcontrol.beam.delays import Delays, pointing_to_str
from tangostationcontrol.beam.managers import AbstractBeamManager from tangostationcontrol.beam.managers import AbstractBeamManager
from tangostationcontrol.common.constants import MAX_POINTINGS, N_point_prop from tangostationcontrol.common.constants import MAX_POINTINGS, N_point_prop
...@@ -132,7 +132,7 @@ class BeamDevice(AsyncDevice): ...@@ -132,7 +132,7 @@ class BeamDevice(AsyncDevice):
dtype=(str,), dtype=(str,),
max_dim_x=MAX_POINTINGS, max_dim_x=MAX_POINTINGS,
fget=lambda self: [ fget=lambda self: [
"{0} ({1}, {2})".format(*x) pointing_to_str(tuple(x))
for x in self._beam_manager.current_pointing_direction for x in self._beam_manager.current_pointing_direction
], ],
fisallowed="is_attribute_access_allowed", fisallowed="is_attribute_access_allowed",
...@@ -241,7 +241,7 @@ class BeamDevice(AsyncDevice): ...@@ -241,7 +241,7 @@ class BeamDevice(AsyncDevice):
) )
for pointing in value: for pointing in value:
if not self.generic_delay_calculator.is_valid_direction(pointing): if not self.generic_delay_calculator.is_valid_pointing(pointing):
raise ValueError(f"Invalid direction: {pointing}") raise ValueError(f"Invalid direction: {pointing}")
# store the new values # store the new values
...@@ -299,7 +299,7 @@ class BeamDevice(AsyncDevice): ...@@ -299,7 +299,7 @@ class BeamDevice(AsyncDevice):
(num_pointings, N_point_prop), dtype="<U32" (num_pointings, N_point_prop), dtype="<U32"
) )
self._beam_manager.new_pointing_direction = numpy.array( self._beam_manager.new_pointing_direction = numpy.array(
[["AZELGEO", "0rad", "1.570796rad"]] * num_pointings, dtype="<U32" [["None", "", ""]] * num_pointings, dtype="<U32"
) )
self._beam_manager.beam_tracking_application_offset = ( self._beam_manager.beam_tracking_application_offset = (
......
...@@ -37,37 +37,24 @@ class TestDelays(base.TestCase): ...@@ -37,37 +37,24 @@ class TestDelays(base.TestCase):
self.assertRaises(ValueError, Delays, [0, 0, 0]) self.assertRaises(ValueError, Delays, [0, 0, 0])
def test_is_valid_direction(self): def test_is_valid_pointing(self):
d = Delays([0, 0, 0]) d = Delays([0, 0, 0])
# should accept base use cases # should accept base use cases
self.assertTrue(d.is_valid_direction(("J2000", "0rad", "0rad"))) self.assertTrue(d.is_valid_pointing(("J2000", "0rad", "0rad")))
self.assertTrue(d.is_valid_direction(("J2000", "4.712389rad", "1.570796rad"))) self.assertTrue(d.is_valid_pointing(("J2000", "4.712389rad", "1.570796rad")))
self.assertTrue(d.is_valid_direction(("AZELGEO", "0rad", "0rad"))) self.assertTrue(d.is_valid_pointing(("AZELGEO", "0rad", "0rad")))
self.assertTrue(d.is_valid_direction(("AZELGEO", "4.712389rad", "1.570796rad"))) self.assertTrue(d.is_valid_pointing(("AZELGEO", "4.712389rad", "1.570796rad")))
self.assertTrue(d.is_valid_direction(("SUN", "0rad", "0rad"))) self.assertTrue(d.is_valid_pointing(("SUN", "0rad", "0rad")))
self.assertTrue(d.is_valid_pointing(("None", "", "")))
# i dont get these either, but casacore accepts them
self.assertTrue(d.is_valid_direction([]))
self.assertTrue(d.is_valid_direction(("J2000",)))
self.assertTrue(
d.is_valid_direction(
(
"J2000",
"0rad",
)
)
)
self.assertTrue(d.is_valid_direction(("J2000", "0rad", "0rad", "0rad")))
# should not throw, and return False, on bad uses # should not throw, and return False, on bad uses
self.assertFalse(d.is_valid_direction(("", "", ""))) self.assertFalse(d.is_valid_pointing([]))
self.assertFalse( self.assertFalse(d.is_valid_pointing(("", "", "")))
d.is_valid_direction(("J2000", "0rad", "0rad", "0rad", "0rad")) self.assertFalse(d.is_valid_pointing(("J2000", "0rad", "0rad", "0rad", "0rad")))
) self.assertFalse(d.is_valid_pointing((1, 2, 3)))
self.assertFalse(d.is_valid_direction((1, 2, 3))) self.assertFalse(d.is_valid_pointing("foo"))
self.assertFalse(d.is_valid_direction("foo")) self.assertFalse(d.is_valid_pointing(None))
self.assertFalse(d.is_valid_direction(None))
def test_sun(self): def test_sun(self):
# # create a frame tied to the reference position # # create a frame tied to the reference position
...@@ -80,10 +67,9 @@ class TestDelays(base.TestCase): ...@@ -80,10 +67,9 @@ class TestDelays(base.TestCase):
d.set_measure_time(timestamp) d.set_measure_time(timestamp)
# point to the sun # point to the sun
direction = "SUN", "0rad", "0rad" pointing = "SUN", "0rad", "0rad"
# calculate the delays based on the set reference position, the set time and now the set direction and antenna positions. # calculate the delays based on the set reference position, the set time and now the set direction and antenna positions.
pointing = d.measure.direction(*direction)
direction = d.get_direction_vector(pointing) direction = d.get_direction_vector(pointing)
""" """
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment