Skip to content
Snippets Groups Projects
Select Git revision
  • ded32ac769fe79b08758f975ce817b7ce93c4268
  • master default protected
  • L2SS-1914-fix_job_dispatch
  • TMSS-3170
  • TMSS-3167
  • TMSS-3161
  • TMSS-3158-Front-End-Only-Allow-Changing-Again
  • TMSS-3133
  • TMSS-3319-Fix-Templates
  • test-fix-deploy
  • TMSS-3134
  • TMSS-2872
  • defer-state
  • add-custom-monitoring-points
  • TMSS-3101-Front-End-Only
  • TMSS-984-choices
  • SDC-1400-Front-End-Only
  • TMSS-3079-PII
  • TMSS-2936
  • check-for-max-244-subbands
  • TMSS-2927---Front-End-Only-PXII
  • Before-Remove-TMSS
  • LOFAR-Release-4_4_318 protected
  • LOFAR-Release-4_4_317 protected
  • LOFAR-Release-4_4_316 protected
  • LOFAR-Release-4_4_315 protected
  • LOFAR-Release-4_4_314 protected
  • LOFAR-Release-4_4_313 protected
  • LOFAR-Release-4_4_312 protected
  • LOFAR-Release-4_4_311 protected
  • LOFAR-Release-4_4_310 protected
  • LOFAR-Release-4_4_309 protected
  • LOFAR-Release-4_4_308 protected
  • LOFAR-Release-4_4_307 protected
  • LOFAR-Release-4_4_306 protected
  • LOFAR-Release-4_4_304 protected
  • LOFAR-Release-4_4_303 protected
  • LOFAR-Release-4_4_302 protected
  • LOFAR-Release-4_4_301 protected
  • LOFAR-Release-4_4_300 protected
  • LOFAR-Release-4_4_299 protected
41 results

urls.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    test_delays.py 10.84 KiB
    # Copyright (C) 2022 ASTRON (Netherlands Institute for Radio Astronomy)
    # SPDX-License-Identifier: Apache-2.0
    
    import datetime
    import logging
    import statistics
    import time
    
    import casacore
    from unittest import mock
    import numpy
    import numpy.testing
    import threading
    
    from tangostationcontrol.beam.delays import Delays, is_valid_pointing
    from tangostationcontrol.common.constants import MAX_ANTENNA, N_beamlets_ctrl
    
    from test import base
    
    
    class TestDelays(base.TestCase):
        def test_init(self):
            """Fail condition is simply the object creation failing"""
    
            reference_itrf = [
                3826577.066,
                461022.948,
                5064892.786,
            ]  # CS002LBA, in ITRF2005 epoch 2012.5
            d = Delays(reference_itrf)
    
            d.stop()
    
        def test_init_fails(self):
            """Test do_measure returning false is correctly caught"""
    
            with mock.patch.object(casacore.measures, "measures") as m_measure:
                m_measure.return_value.do_frame.return_value = False
    
                self.assertRaises(ValueError, Delays, [0, 0, 0])
    
        def test_is_valid_pointing(self):
            # should accept base use cases
            self.assertTrue(is_valid_pointing(("J2000", "0rad", "0rad")))
            self.assertTrue(is_valid_pointing(("J2000", "4.712389rad", "1.570796rad")))
            self.assertTrue(is_valid_pointing(("AZELGEO", "0rad", "0rad")))
            self.assertTrue(is_valid_pointing(("AZELGEO", "4.712389rad", "1.570796rad")))
            self.assertTrue(is_valid_pointing(("SUN", "0rad", "0rad")))
            self.assertTrue(is_valid_pointing(("None", "", "")))
    
            # should not throw, and return False, on bad uses
            self.assertFalse(is_valid_pointing(()))
            self.assertFalse(is_valid_pointing(("", "", "")))
            self.assertFalse(is_valid_pointing(("J2000", "0rad", "0rad", "0rad", "0rad")))
            self.assertFalse(is_valid_pointing((1, 2, 3)))
            self.assertFalse(is_valid_pointing("foo"))
            self.assertFalse(is_valid_pointing(None))
    
        def test_sun(self):
            # # create a frame tied to the reference position
            reference_itrf = [3826577.066, 461022.948, 5064892.786]
            d = Delays(reference_itrf)
    
            try:
                for i in range(24):
                    # set the time to the day of the winter solstice 2021 (21 december 16:58) as this is the time with the least change in sunlight
                    timestamp = datetime.datetime(2021, 12, 21, i, 58, 0)
                    d.set_measure_time(timestamp)
    
                    # point to the sun
                    pointing = "SUN", "0rad", "0rad"
    
                    # calculate the delays based on the set reference position, the set time and now the set direction and antenna positions.
                    direction = d.get_direction_vector(pointing)
    
                    """
                    direction[2] is the z-coordinate of ITRF, which points to the north pole.
                    This direction is constant when pointing to the sun, as the earth rotates around its axis,
                    but changes slowly due to the earths rotation around the sun.
                    The summer and winter solstices are when these values are at their peaks and the changes are the smallest.
                    This test takes the value at the winter solstice and checks whether the measured values are near enough to that.
                    """
    
                    # Measured manually at the winter solstice. Using datetime.datetime(2021, 12, 21, 16, 58, 0)
                    z_at_solstice = -0.3977784695213487
                    z_direction = direction[2]
    
                    self.assertAlmostEqual(z_at_solstice, z_direction, 4)
            finally:
                d.stop()
    
        def test_identical_location(self):
            # # create a frame tied to the reference position
            reference_itrf = [
                3826577.066,
                461022.948,
                5064892.786,
            ]  # CS002LBA, in ITRF2005 epoch 2012.5
            d = Delays(reference_itrf)
    
            try:
                # set the antenna position identical to the reference position
                relative_antenna_itrf = [[0.0, 0.0, 0.0]]
    
                # # set the timestamp to solve for
                timestamp = datetime.datetime(2000, 1, 1, 0, 0, 0)
                d.set_measure_time(timestamp)
    
                # compute the delays for an antennas w.r.t. the reference position
    
                # # obtain the direction vector for a specific pointing
                direction = "J2000", "0rad", "0rad"
    
                # calculate the delays based on the set reference position, the set time and now the set direction and antenna positions.
                delays = d.delays(direction, relative_antenna_itrf)
    
                self.assertListEqual(delays.tolist(), [0.0], msg=f"delays = {delays}")
            finally:
                d.stop()
    
        def test_regression(self):
            reference_itrf = [
                3826577.066,
                461022.948,
                5064892.786,
            ]  # CS002LBA, in ITRF2005 epoch 2012.5
            d = Delays(reference_itrf)
    
            try:
                # set the antenna position identical to the reference position
                antenna_itrf = numpy.array(
                    [[3826923.503, 460915.488, 5064643.517]]
                )  # CS001LBA, in ITRF2005 epoch 2012.5
    
                relative_antenna_itrf = antenna_itrf - reference_itrf
    
                # # set the timestamp to solve for
                timestamp = datetime.datetime(2000, 1, 1, 0, 0, 0)
                d.set_measure_time(timestamp)
    
                # # obtain the direction vector for a specific pointing
                direction = "J2000", "0rad", "1.570796rad"
    
                # calculate the delays based on the set reference position, the set time and now the set direction and antenna positions.
                delays = d.delays(direction, relative_antenna_itrf)
    
                # check for regression
                self.assertAlmostEqual(-8.31467564781444e-07, delays[0], delta=1.0e-10)
            finally:
                d.stop()
    
        def test_light_second_delay(self):
            """
            This test measures whether the distance between 2 positions is 0.1 light second apart.
            """
    
            reference_itrf = [0, 0, 0]
            d = Delays(reference_itrf)
    
            try:
                # set the antenna position 0.1 lightsecond in the Z direction of the ITRF,
                # which is aligned with the North Pole, see
                # https://en.wikipedia.org/wiki/Earth-centered,_Earth-fixed_coordinate_system#Structure
                speed_of_light = 299792458.0
                relative_antenna_itrf = [[0, 0, 0.1 * speed_of_light]]
    
                # We need to point along the same direction in order to have the delay reflect the distance.
                #
                # We point at the North Celestial Pole in J2000, which is always at 90 degrees declanation,
                # see https://gssc.esa.int/navipedia/index.php/Conventional_Celestial_Reference_System
                timestamp = datetime.datetime(
                    2022, 3, 1, 0, 0, 0
                )  # timestamp does not actually matter, but casacore doesn't know that.
                d.set_measure_time(timestamp)
                direction = "J2000", "0rad", "1.570796rad"
    
                # calculate the delays based on the set reference position, the set time and now the set direction and antenna positions.
                delays = d.delays(direction, relative_antenna_itrf)
    
                self.assertAlmostEqual(0.1, delays[0], 6, f"delays[0] = {delays[0]}")
            finally:
                d.stop()
    
    
    class TestDelaysBulk(base.TestCase):
        @staticmethod
        def makeDelays():
            d = Delays([0, 0, 0])
            timestamp = datetime.datetime(
                2022, 3, 1, 0, 0, 0
            )  # timestamp does not actually matter, but casacore doesn't know that.
            d.set_measure_time(timestamp)
    
            return d
    
        def setUp(self):
            self.d = self.makeDelays()
            self.addCleanup(self.d.stop)
    
            # generate different positions and directions
            self.positions = numpy.array([[i, 2, 3] for i in range(MAX_ANTENNA)])
            self.directions = numpy.array(
                [
                    ["J2000", f"{i*numpy.pi/180}rad", f"{i*numpy.pi/180}rad"]
                    for i in range(N_beamlets_ctrl)
                ]
            )
    
        def test_delays_bulk(self):
            bulk_result = self.d.delays_bulk(self.directions, self.positions)
    
            # verify parallellisation along direction axis
            for i, single_dir in enumerate(self.directions):
                single_dir_result = self.d.delays_bulk([single_dir], self.positions)
                numpy.testing.assert_almost_equal(
                    single_dir_result[:, 0], bulk_result[:, i], 4
                )
    
            # verify parallellisation along position axis
            for i, single_pos in enumerate(self.positions):
                single_pos_result = self.d.delays_bulk(self.directions, [single_pos])
                numpy.testing.assert_almost_equal(
                    single_pos_result[0, :], bulk_result[i, :], 4
                )
    
        def test_delays_bulk_speed(self):
            count = 10
            before = time.monotonic_ns()
            for _ in range(count):
                _ = self.d.delays_bulk(self.directions, self.positions)
            after = time.monotonic_ns()
            logging.error(
                f"delays bulk averaged {(after - before) / count / 1e6} ms to convert 488 directions for 96 antennas."
            )
    
        def test_delays_bulk_parallel_speed(self):
            count = 10
            nr_threads = 3
            duration_results_ms: list[float] = []
    
            def run_delays_bulk():
                # make sure each thread runs its own instance to allow parallellism
                # between instances
                d = self.makeDelays()
    
                try:
                    before = time.monotonic_ns()
                    for _ in range(count):
                        _ = d.delays_bulk(self.directions, self.positions)
                    after = time.monotonic_ns()
    
                    duration_results_ms.append((after - before) / count / 1e6)
                finally:
                    d.stop()
    
            # measure single-threaded performance first
            for _ in range(nr_threads):
                run_delays_bulk()
            single_thread_execution_time = statistics.median(duration_results_ms)
    
            logging.error(
                f"delays bulk single-threaded cost averages at {single_thread_execution_time} ms per call to convert 488 directions for 96 antennas {count} times."
            )
    
            # compare against multi-threaded performance
            duration_results_ms = []
            threads = [threading.Thread(target=run_delays_bulk) for _ in range(nr_threads)]
            [t.start() for t in threads]
            [t.join() for t in threads]
    
            logging.error(
                f"delays bulk multi-threaded averages are {[int(d) for d in duration_results_ms]} ms per call to convert 488 directions for 96 antennas {count} times using {nr_threads} threads."
            )
    
            # as we have a real time system and sufficient cores, we care
            # most about the worst case for a thread. We test for full
            # parallellism, plus some tolerance.
            self.assertLess(
                max(duration_results_ms),
                single_thread_execution_time * 1.50,  # 50% tolerance
            )