diff --git a/tangostationcontrol/tangostationcontrol/beam/delays.py b/tangostationcontrol/tangostationcontrol/beam/delays.py index 3e824b24ceaa62e0e9217766e47146e4ade93aac..160ecdc810bb33992f21ec860299b9dbae7ab6c9 100644 --- a/tangostationcontrol/tangostationcontrol/beam/delays.py +++ b/tangostationcontrol/tangostationcontrol/beam/delays.py @@ -2,17 +2,71 @@ # SPDX-License-Identifier: Apache-2.0 import datetime +from functools import lru_cache +from typing import TypedDict import casacore.measures import numpy +class CasacoreQuantum(TypedDict): + """A casacore::Quantum object as returned by casacore.measures.""" + + unit: str + value: float + + +class CasacoreMDirection(TypedDict): + """A casacore::MDirection object as returned by casacore.measures.""" + + type: str + refer: str + m1: CasacoreQuantum + m2: CasacoreQuantum + + +@lru_cache +def pointing_to_str(pointing: tuple[str, str, str]) -> str: + """Convert a pointing tuple (direction_type, angle0, angle1) into a coincise string to display.""" + + direction_type, angle0, angle1 = pointing + + def display_angle(angle: str) -> str: + """Cut off lengthy precision, if possible.""" + + if angle.endswith("rad"): + # Radians, round to 5 digits + try: + angle_amount = float(angle[:-3]) + except ValueError: + return angle + + return "{0:.5}rad".format(angle_amount) + + if angle.endswith("deg"): + # Degrees, round to 3 digits + try: + angle_amount = float(angle[:-3]) + except ValueError: + return angle + + return "{0:.3}deg".format(angle_amount) + + return angle + + return "{direction_type} ({angle0}, {angle1})".format( + direction_type=direction_type, + angle0=display_angle(angle0), + angle1=display_angle(angle1), + ) + + def subtract(a, b) -> numpy.ndarray: return numpy.array([x - y for x, y in zip(a, b)]) class Delays: - def __init__(self, itrf: list[[float]]): + def __init__(self, itrf: tuple[float, float, float]): """Create a measure object, configured for the specified terrestrial location.""" measure = casacore.measures.measures() @@ -33,20 +87,27 @@ class Delays: if not self.measure.do_frame(frame_time): raise ValueError(f"measure.do_frame failed for UTC time {utc_time_str}") - def get_direction_vector(self, pointing: numpy.ndarray) -> numpy.ndarray: + def get_direction_vector(self, pointing: list[str]) -> numpy.ndarray: """Compute direction vector for a given pointing, relative to the measure.""" return self.get_direction_vector_bulk([pointing]).flatten() - def get_direction_vector_bulk(self, pointings: numpy.ndarray) -> numpy.ndarray: + def get_direction_vector_bulk(self, pointings: list[list[str]]) -> numpy.ndarray: """Compute direction vectors for the given pointings, relative to the measure.""" angles0 = numpy.empty(len(pointings)) angles1 = numpy.empty(len(pointings)) for idx, pointing in enumerate(pointings): - angles = self.measure.measure(pointing, "ITRF") - angles0[idx] = angles["m0"]["value"] - angles1[idx] = angles["m1"]["value"] + direction: CasacoreMDirection | None = self._pointing_to_direction(pointing) + + if direction is None: + # uninitialised pointings + angles0[idx] = 0 + angles1[idx] = 0 + else: + angles = self.measure.measure(direction, "ITRF") + angles0[idx] = angles["m0"]["value"] + angles1[idx] = angles["m1"]["value"] # Convert polar to carthesian coordinates # see also https://github.com/casacore/casacore/blob/e793b3d5339d828a60339d16476bf688a19df3ec/casa/Quanta/MVDirection.cc#L67 @@ -61,16 +122,32 @@ class Delays: # Return array [directions][angles] return direction_vectors.T - def is_valid_direction(self, direction) -> bool: + def _pointing_to_direction( + self, pointing: tuple[str, str, str] + ) -> CasacoreMDirection | None: + try: + if pointing[0] == "None": + # uninitialised direction + return None + + return self.measure.direction(*pointing) + except (RuntimeError, TypeError, KeyError, IndexError) as e: + raise ValueError(f"Invalid pointing: {pointing}") from e + + def is_valid_pointing(self, pointing: tuple[str, str, str]) -> bool: """Check validity of the direction measure""" try: - _ = self.measure.direction(*direction) - except (RuntimeError, TypeError) as e: + _ = self._pointing_to_direction(pointing) + except ValueError as e: return False return True - def delays(self, direction, antenna_absolute_itrf: list[[float]]) -> numpy.ndarray: + def delays( + self, + pointing: list[str], + antenna_absolute_itrf: list[tuple[float, float, float]], + ) -> numpy.ndarray: """Get the delays for a direction and *absolute* antenna positions. These are the delays that have to be applied to the signal chain in order to line up the signal. @@ -79,12 +156,12 @@ class Delays: Returns delays[antenna].""" return self.delays_bulk( - numpy.array([direction]), + numpy.array([pointing]), numpy.array(antenna_absolute_itrf) - self.reference_itrf, ).flatten() def delays_bulk( - self, directions: numpy.ndarray, antenna_relative_itrfs: numpy.ndarray + self, pointings: numpy.ndarray, antenna_relative_itrfs: numpy.ndarray ) -> numpy.ndarray: """Get the delays for each direction and each *relative* antenna position. @@ -94,11 +171,6 @@ class Delays: Returns delays[antenna][direction].""" # obtain the direction vector for each pointing - try: - pointings = [self.measure.direction(*direction) for direction in directions] - except (RuntimeError, TypeError) as e: - raise ValueError("Invalid direction") from e - direction_vectors = self.get_direction_vector_bulk(pointings) # compute the corresponding delays for all directions diff --git a/tangostationcontrol/tangostationcontrol/devices/base_device_classes/beam_device.py b/tangostationcontrol/tangostationcontrol/devices/base_device_classes/beam_device.py index b7b370aa550a8116edb53dc87dba59b50ae38bef..0e50d03354ffe351a1c74509ae3041ec6f246801 100644 --- a/tangostationcontrol/tangostationcontrol/devices/base_device_classes/beam_device.py +++ b/tangostationcontrol/tangostationcontrol/devices/base_device_classes/beam_device.py @@ -26,7 +26,7 @@ from tango import ( # PyTango imports from tango.server import attribute, command, device_property -from tangostationcontrol.beam.delays import Delays +from tangostationcontrol.beam.delays import Delays, pointing_to_str from tangostationcontrol.beam.managers import AbstractBeamManager from tangostationcontrol.common.constants import MAX_POINTINGS, N_point_prop @@ -132,7 +132,7 @@ class BeamDevice(AsyncDevice): dtype=(str,), max_dim_x=MAX_POINTINGS, fget=lambda self: [ - "{0} ({1}, {2})".format(*x) + pointing_to_str(tuple(x)) for x in self._beam_manager.current_pointing_direction ], fisallowed="is_attribute_access_allowed", @@ -241,7 +241,7 @@ class BeamDevice(AsyncDevice): ) for pointing in value: - if not self.generic_delay_calculator.is_valid_direction(pointing): + if not self.generic_delay_calculator.is_valid_pointing(pointing): raise ValueError(f"Invalid direction: {pointing}") # store the new values @@ -299,7 +299,7 @@ class BeamDevice(AsyncDevice): (num_pointings, N_point_prop), dtype="<U32" ) self._beam_manager.new_pointing_direction = numpy.array( - [["AZELGEO", "0rad", "1.570796rad"]] * num_pointings, dtype="<U32" + [["None", "", ""]] * num_pointings, dtype="<U32" ) self._beam_manager.beam_tracking_application_offset = ( diff --git a/tangostationcontrol/test/beam/test_delays.py b/tangostationcontrol/test/beam/test_delays.py index 7d4173c6fdf49d58abda959509d4fbfc70d5efcb..e83ef8611b794450fcd99d1418ac3b359c348317 100644 --- a/tangostationcontrol/test/beam/test_delays.py +++ b/tangostationcontrol/test/beam/test_delays.py @@ -37,37 +37,24 @@ class TestDelays(base.TestCase): self.assertRaises(ValueError, Delays, [0, 0, 0]) - def test_is_valid_direction(self): + def test_is_valid_pointing(self): d = Delays([0, 0, 0]) # should accept base use cases - self.assertTrue(d.is_valid_direction(("J2000", "0rad", "0rad"))) - self.assertTrue(d.is_valid_direction(("J2000", "4.712389rad", "1.570796rad"))) - self.assertTrue(d.is_valid_direction(("AZELGEO", "0rad", "0rad"))) - self.assertTrue(d.is_valid_direction(("AZELGEO", "4.712389rad", "1.570796rad"))) - self.assertTrue(d.is_valid_direction(("SUN", "0rad", "0rad"))) - - # i dont get these either, but casacore accepts them - self.assertTrue(d.is_valid_direction([])) - self.assertTrue(d.is_valid_direction(("J2000",))) - self.assertTrue( - d.is_valid_direction( - ( - "J2000", - "0rad", - ) - ) - ) - self.assertTrue(d.is_valid_direction(("J2000", "0rad", "0rad", "0rad"))) + self.assertTrue(d.is_valid_pointing(("J2000", "0rad", "0rad"))) + self.assertTrue(d.is_valid_pointing(("J2000", "4.712389rad", "1.570796rad"))) + self.assertTrue(d.is_valid_pointing(("AZELGEO", "0rad", "0rad"))) + self.assertTrue(d.is_valid_pointing(("AZELGEO", "4.712389rad", "1.570796rad"))) + self.assertTrue(d.is_valid_pointing(("SUN", "0rad", "0rad"))) + self.assertTrue(d.is_valid_pointing(("None", "", ""))) # should not throw, and return False, on bad uses - self.assertFalse(d.is_valid_direction(("", "", ""))) - self.assertFalse( - d.is_valid_direction(("J2000", "0rad", "0rad", "0rad", "0rad")) - ) - self.assertFalse(d.is_valid_direction((1, 2, 3))) - self.assertFalse(d.is_valid_direction("foo")) - self.assertFalse(d.is_valid_direction(None)) + self.assertFalse(d.is_valid_pointing([])) + self.assertFalse(d.is_valid_pointing(("", "", ""))) + self.assertFalse(d.is_valid_pointing(("J2000", "0rad", "0rad", "0rad", "0rad"))) + self.assertFalse(d.is_valid_pointing((1, 2, 3))) + self.assertFalse(d.is_valid_pointing("foo")) + self.assertFalse(d.is_valid_pointing(None)) def test_sun(self): # # create a frame tied to the reference position @@ -80,10 +67,9 @@ class TestDelays(base.TestCase): d.set_measure_time(timestamp) # point to the sun - direction = "SUN", "0rad", "0rad" + pointing = "SUN", "0rad", "0rad" # calculate the delays based on the set reference position, the set time and now the set direction and antenna positions. - pointing = d.measure.direction(*direction) direction = d.get_direction_vector(pointing) """