diff --git a/tangostationcontrol/tangostationcontrol/beam/delays.py b/tangostationcontrol/tangostationcontrol/beam/delays.py index b1d7f9976ec5de3e75784dac484c9c33584f2806..7c4b13ba1cdf74b72565e5aa22cae631c7242e16 100644 --- a/tangostationcontrol/tangostationcontrol/beam/delays.py +++ b/tangostationcontrol/tangostationcontrol/beam/delays.py @@ -2,11 +2,11 @@ import casacore.measures import numpy import datetime -def subtract(a, b): +def subtract(a, b) -> numpy.ndarray: return numpy.array([x - y for x, y in zip(a, b)]) -class delay_calculator: +class Delays: def __init__(self, itrf: list([float])): """ Create a measure object, configured for the specified terrestrial location. """ @@ -14,8 +14,8 @@ class delay_calculator: measure = casacore.measures.measures() frame_location = measure.position("ITRF", *[f"{x}m" for x in itrf]) - result = measure.do_frame(frame_location) - assert result == True + if not measure.do_frame(frame_location): + raise ValueError(f"measure.do_frame failed for ITRF location {itrf}") self.reference_itrf = itrf self.measure = measure @@ -23,10 +23,11 @@ class delay_calculator: def set_measure_time(self, utc_time: datetime.datetime): """ Configure the measure object for the specified time. """ - frame_time = self.measure.epoch("UTC", utc_time.isoformat(' ')) + utc_time_str = utc_time.isoformat(' ') + frame_time = self.measure.epoch("UTC", utc_time_str) - result = self.measure.do_frame(frame_time) - assert result == True + if not self.measure.do_frame(frame_time): + raise ValueError(f"measure.do_frame failed for UTC time {utc_time_str}") def get_direction_vector(self, pointing: numpy.ndarray) -> numpy.ndarray: """ Compute direction vector for a given pointing, relative to the measure. """ @@ -53,7 +54,8 @@ class delay_calculator: # Return array [directions][angles] return direction_vectors.T - def is_valid_direction(self, direction): + def is_valid_direction(self, direction) -> bool: + """ Check validity of the direction measure """ try: _ = self.measure.direction(*direction) except (RuntimeError, TypeError) as e: @@ -61,14 +63,14 @@ class delay_calculator: return True - def convert(self, direction, antenna_absolute_itrf: list([float])): + def delays(self, direction, antenna_absolute_itrf: list([float])) -> numpy.ndarray: """ Get the delays for a direction and *absolute* antenna positions. Returns delays[antenna]. """ - return self.convert_bulk([direction], numpy.array(antenna_absolute_itrf) - self.reference_itrf).flatten() + return self.delays_bulk([direction], numpy.array(antenna_absolute_itrf) - self.reference_itrf).flatten() - def convert_bulk(self, directions: numpy.ndarray, antenna_relative_itrfs: numpy.ndarray) -> numpy.ndarray: + def delays_bulk(self, directions: numpy.ndarray, antenna_relative_itrfs: numpy.ndarray) -> numpy.ndarray: """ Get the delays for each direction and each *relative* antenna position. Returns delays[antenna][direction]. """ diff --git a/tangostationcontrol/tangostationcontrol/devices/beam_device.py b/tangostationcontrol/tangostationcontrol/devices/beam_device.py index c8ccf78e649b194b9efd0a2e97231310cf091581..46f8ef4e01c56c4f88c3a8566afa33a85e354dfe 100644 --- a/tangostationcontrol/tangostationcontrol/devices/beam_device.py +++ b/tangostationcontrol/tangostationcontrol/devices/beam_device.py @@ -24,7 +24,7 @@ from tangostationcontrol.common.measures import get_measures_directory, get_avai from tangostationcontrol.common.lofar_logging import log_exceptions from tangostationcontrol.common.states import DEFAULT_COMMAND_STATES from tangostationcontrol.devices.device_decorators import TimeIt, only_in_states, fault_on_error -from tangostationcontrol.beam.delays import delay_calculator +from tangostationcontrol.beam.delays import Delays from tangostationcontrol.devices.lofar_device import lofar_device __all__ = ["beam_device", "main", "BeamTracker"] @@ -219,7 +219,7 @@ class beam_device(lofar_device): self.Beam_tracker = None # generic delay calculator to ask about validity of settings - self.generic_delay_calculator = delay_calculator([0, 0, 0]) + self.generic_delay_calculator = Delays([0, 0, 0]) # Derived classes will override this with a non-parameterised # version that lofar_device will call. diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py b/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py index 256d156a4337bffb46e2ed085cbcc151d6d4f5b2..f950e1f600169fdd1671d4d0d679c837e8d0c909 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py @@ -17,7 +17,7 @@ from tangostationcontrol.devices.beam_device import beam_device from tangostationcontrol.devices.sdp.beamlet import Beamlet from tangostationcontrol.devices.device_decorators import TimeIt from tangostationcontrol.common.lofar_logging import log_exceptions -from tangostationcontrol.beam.delays import delay_calculator +from tangostationcontrol.beam.delays import Delays import numpy import datetime @@ -137,7 +137,7 @@ class DigitalBeam(beam_device): input_itrf[input_nr] = antenna_itrf[antenna_nr] # a delay calculator - self.delay_calculator = delay_calculator(reference_itrf) + self.delay_calculator = Delays(reference_itrf) # relative positions of each antenna self.relative_input_positions = input_itrf - reference_itrf @@ -165,7 +165,7 @@ class DigitalBeam(beam_device): d.set_measure_time(timestamp) # calculate the delays based on the set reference position, the set time and now the set direction and antenna positions - delays = d.convert_bulk(pointing_direction, self.relative_input_positions) + delays = d.delays_bulk(pointing_direction, self.relative_input_positions) return delays diff --git a/tangostationcontrol/tangostationcontrol/devices/tilebeam.py b/tangostationcontrol/tangostationcontrol/devices/tilebeam.py index d8e98ad75c550ed85b5d6a23cc0c8c7543637011..a98e88a764461b1b1b7cd1b90a0feed8975e175d 100644 --- a/tangostationcontrol/tangostationcontrol/devices/tilebeam.py +++ b/tangostationcontrol/tangostationcontrol/devices/tilebeam.py @@ -16,7 +16,7 @@ from tango import Util # Additional import from tangostationcontrol.common.entrypoint import entry from tangostationcontrol.common.lofar_logging import device_logging_to_python, log_exceptions -from tangostationcontrol.beam.delays import delay_calculator +from tangostationcontrol.beam.delays import Delays from tangostationcontrol.beam.hba_tile import NUMBER_OF_ELEMENTS_PER_TILE from tangostationcontrol.devices.beam_device import beam_device from tangostationcontrol.devices.device_decorators import TimeIt @@ -61,7 +61,7 @@ class TileBeam(beam_device): HBAT_antenna_itrf_offsets = self.antennafield_proxy.HBAT_antenna_itrf_offsets_R.reshape(self._nr_tiles, NUMBER_OF_ELEMENTS_PER_TILE, 3) # a delay calculator for each tile - self.HBAT_delay_calculators = [delay_calculator(reference_itrf) for reference_itrf in HBAT_reference_itrf] + self.HBAT_delay_calculators = [Delays(reference_itrf) for reference_itrf in HBAT_reference_itrf] # absolute positions of each antenna element self.HBAT_antenna_positions = [HBAT_reference_itrf[tile] + HBAT_antenna_itrf_offsets[tile] for tile in range(self._nr_tiles)] @@ -85,7 +85,7 @@ class TileBeam(beam_device): d.set_measure_time(timestamp) # calculate the delays based on the set reference position, the set time and now the set direction and antenna positions - delays[tile] = d.convert(pointing_direction[tile], self.HBAT_antenna_positions[tile]) + delays[tile] = d.delays(pointing_direction[tile], self.HBAT_antenna_positions[tile]) return delays diff --git a/tangostationcontrol/tangostationcontrol/test/beam/test_delays.py b/tangostationcontrol/tangostationcontrol/test/beam/test_delays.py index 12d015774c933322495fc3aa7abe8d5c57052628..0a779f980eabd5cc08d95f0521e48acceefee2f5 100644 --- a/tangostationcontrol/tangostationcontrol/test/beam/test_delays.py +++ b/tangostationcontrol/tangostationcontrol/test/beam/test_delays.py @@ -1,26 +1,35 @@ import datetime import time import logging +import mock + import numpy import numpy.testing +import casacore -from tangostationcontrol.beam.delays import delay_calculator +from tangostationcontrol.beam.delays import Delays from tangostationcontrol.test import base class TestDelays(base.TestCase): def test_init(self): - """ - Fail condition is simply the object creation failing - """ + """Fail condition is simply the object creation failing""" reference_itrf = [3826577.066, 461022.948, 5064892.786] # CS002LBA, in ITRF2005 epoch 2012.5 - d = delay_calculator(reference_itrf) + d = Delays(reference_itrf) self.assertIsNotNone(d) + def test_init_fails(self): + """Test do_measure returning false is correctly caught""" + + with mock.patch.object(casacore.measures, "measures") as m_measure: + m_measure.return_value.do_frame.return_value = False + + self.assertRaises(ValueError, Delays, [0, 0, 0]) + def test_is_valid_direction(self): - d = delay_calculator([0, 0, 0]) + d = Delays([0, 0, 0]) # should accept base use cases self.assertTrue(d.is_valid_direction(("J2000", "0deg", "0deg"))) @@ -45,7 +54,7 @@ class TestDelays(base.TestCase): def test_sun(self): # # create a frame tied to the reference position reference_itrf = [3826577.066, 461022.948, 5064892.786] - d = delay_calculator(reference_itrf) + d = Delays(reference_itrf) for i in range(24): @@ -78,7 +87,7 @@ class TestDelays(base.TestCase): def test_identical_location(self): # # create a frame tied to the reference position reference_itrf = [3826577.066, 461022.948, 5064892.786] # CS002LBA, in ITRF2005 epoch 2012.5 - d = delay_calculator(reference_itrf) + d = Delays(reference_itrf) # set the antenna position identical to the reference position antenna_itrf = [[reference_itrf[0], reference_itrf[1], reference_itrf[2]]] # CS001LBA, in ITRF2005 epoch 2012.5 @@ -93,13 +102,13 @@ class TestDelays(base.TestCase): direction = "J2000", "0deg", "0deg" # calculate the delays based on the set reference position, the set time and now the set direction and antenna positions. - delays = d.convert(direction, antenna_itrf) + delays = d.delays(direction, antenna_itrf) self.assertListEqual(delays.tolist(), [0.0], msg=f"delays = {delays}") def test_regression(self): reference_itrf = [3826577.066, 461022.948, 5064892.786] # CS002LBA, in ITRF2005 epoch 2012.5 - d = delay_calculator(reference_itrf) + d = Delays(reference_itrf) # set the antenna position identical to the reference position antenna_itrf = [[3826923.503, 460915.488, 5064643.517]] # CS001LBA, in ITRF2005 epoch 2012.5 @@ -112,7 +121,7 @@ class TestDelays(base.TestCase): direction = "J2000", "0deg", "90deg" # calculate the delays based on the set reference position, the set time and now the set direction and antenna positions. - delays = d.convert(direction, antenna_itrf) + delays = d.delays(direction, antenna_itrf) # check for regression self.assertAlmostEqual(-8.31467564781444e-07, delays[0], delta=1.0e-10) @@ -123,7 +132,7 @@ class TestDelays(base.TestCase): """ reference_itrf = [0, 0, 0] - d = delay_calculator(reference_itrf) + d = Delays(reference_itrf) # set the antenna position 0.1 lightsecond in the Z direction of the ITRF, # which is aligned with the North Pole, see @@ -140,12 +149,12 @@ class TestDelays(base.TestCase): direction = "J2000", "0deg", "90deg" # calculate the delays based on the set reference position, the set time and now the set direction and antenna positions. - delays = d.convert(direction, antenna_itrf) + delays = d.delays(direction, antenna_itrf) self.assertAlmostEqual(0.1, delays[0], 6, f"delays[0] = {delays[0]}") - def test_convert_bulk(self): - d = delay_calculator([0, 0, 0]) + def test_delays_bulk(self): + d = Delays([0, 0, 0]) timestamp = datetime.datetime(2022, 3, 1, 0, 0, 0) # timestamp does not actually matter, but casacore doesn't know that. d.set_measure_time(timestamp) @@ -153,20 +162,20 @@ class TestDelays(base.TestCase): positions = numpy.array([[i,2,3] for i in range(5)]) directions = numpy.array([["J2000", f"{i}deg", f"{i}deg"] for i in range(90)]) - bulk_result = d.convert_bulk(directions, positions) + bulk_result = d.delays_bulk(directions, positions) # verify parallellisation along direction axis for i, single_dir in enumerate(directions): - single_dir_result = d.convert_bulk([single_dir], positions) + single_dir_result = d.delays_bulk([single_dir], positions) numpy.testing.assert_almost_equal(single_dir_result[:, 0], bulk_result[:, i], 4) # verify parallellisation along position axis for i, single_pos in enumerate(positions): - single_pos_result = d.convert_bulk(directions, [single_pos]) + single_pos_result = d.delays_bulk(directions, [single_pos]) numpy.testing.assert_almost_equal(single_pos_result[0, :], bulk_result[i, :], 4) - def test_convert_bulk_speed(self): - d = delay_calculator([0, 0, 0]) + def test_delays_bulk_speed(self): + d = Delays([0, 0, 0]) timestamp = datetime.datetime(2022, 3, 1, 0, 0, 0) # timestamp does not actually matter, but casacore doesn't know that. d.set_measure_time(timestamp) @@ -176,6 +185,6 @@ class TestDelays(base.TestCase): count = 10 before = time.monotonic_ns() for _ in range(count): - _ = d.convert_bulk(directions, positions) + _ = d.delays_bulk(directions, positions) after = time.monotonic_ns() - logging.error(f"convert_bulk averaged {(after - before)/count/1e6} ms to convert 488 directions for 96 antennas.") + logging.error(f"delays bulk averaged {(after - before)/count/1e6} ms to convert 488 directions for 96 antennas.")