Skip to content
Snippets Groups Projects
Select Git revision
  • 6c85f209aa2ee95ed493e4e2b7b037e7b33c71c9
  • master default protected
  • test-pytango-10.0.3
  • revert-cs032-ccd-ip
  • deploy-components-parallel
  • fix-chrony-exporter
  • L2SS-2407-swap-iers-caltable-monitoring-port
  • L2SS-2357-fix-ruff
  • sync-up-with-meta-pypcc
  • stabilise-landing-page
  • all-stations-lofar2
  • v0.39.7-backports
  • Move-sdptr-to-v1.5.0
  • fix-build-ubuntu
  • tokens-in-env-files
  • fix-build
  • L2SS-2214-deploy-cdb
  • fix-missing-init
  • add-power-hardware-apply
  • L2SS-2129-Add-Subrack-Routine
  • Also-listen-internal-to-rpc
  • v0.55.5-r2 protected
  • v0.52.8-rc1 protected
  • v0.55.5 protected
  • v0.55.4 protected
  • 0.55.2.dev0
  • 0.55.1.dev0
  • 0.55.0.dev0
  • v0.54.0 protected
  • 0.53.2.dev0
  • 0.53.1.dev0
  • v0.52.3-r2 protected
  • remove-snmp-client
  • v0.52.3 protected
  • v0.52.3dev0 protected
  • 0.53.1dev0
  • v0.52.2-rc3 protected
  • v0.52.2-rc2 protected
  • v0.52.2-rc1 protected
  • v0.52.1.1 protected
  • v0.52.1 protected
41 results

test_device_observation_field.py

Blame
  • Jan David Mol's avatar
    Resolve L2SS-1509 "Move grafana rpc to opah"
    Jan David Mol authored
    f2c7ffd5
    History
    Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    test_device_observation_field.py 14.11 KiB
    #  Copyright (C) 2025 ASTRON (Netherlands Institute for Radio Astronomy)
    #  SPDX-License-Identifier: Apache-2.0
    
    import json
    from datetime import datetime
    from json import loads
    
    import numpy
    
    from integration_tests.device_proxy import TestDeviceProxy
    from integration_tests.default.devices.base import TestDeviceBase
    
    from tango import DevState
    from tango import DevFailed
    from tangostationcontrol.common.constants import (
        N_beamlets_ctrl,
        N_elements,
        N_pn,
        N_pol,
        MAX_ANTENNA,
        MAX_PARALLEL_SUBBANDS,
        CS001_TILES,
    )
    from tangostationcontrol.devices.base_device_classes.antennafield_device import (
        AntennaStatus,
        AntennaUse,
    )
    
    from tangostationcontrol.test.dummy_observation_settings import (
        get_observation_settings_hba_core_immediate,
    )
    from tangostationcontrol.common.env_decorators import (
        restore_properties_for_devices,
    )
    
    
    class TestDeviceObservationField(TestDeviceBase):
        __test__ = True
    
        ANTENNA_TO_SDP_MAPPING = [
            "0",
            "0",
            "0",
            "1",
            "0",
            "2",
            "0",
            "3",
            "0",
            "4",
            "0",
            "5",
            "1",
            "0",
            "1",
            "1",
            "1",
            "2",
            "1",
            "3",
            "1",
            "4",
            "1",
            "5",
            "2",
            "0",
            "2",
            "1",
            "2",
            "2",
            "2",
            "3",
            "2",
            "4",
            "2",
            "5",
            "3",
            "0",
            "3",
            "1",
            "3",
            "2",
            "3",
            "3",
            "3",
            "4",
            "3",
            "5",
        ]
        antennafield_name = "STAT/AFH/HBA0"
    
        # We override this test to skip it, as Observation does not allow
        # writing settings anymore when in ON state.
        def test_device_write_all_attributes(self):
            pass
    
        @classmethod
        def setUpClass(cls):
            obs_control = TestDeviceProxy("STAT/ObservationControl/1")
            obs_control.create_test_device()
    
        @classmethod
        def tearDownClass(cls):
            obs_control = TestDeviceProxy("STAT/ObservationControl/1")
            obs_control.destroy_test_device()
    
        @restore_properties_for_devices([antennafield_name])
        def setUp(self):
            super().setUp("STAT/ObservationField/1")
            self.VALID_JSON = json.dumps(
                json.loads(get_observation_settings_hba_core_immediate().to_json())[
                    "antenna_fields"
                ][0]
            )
            self.recv_proxy = self.setup_proxy("STAT/RECVH/H0", defaults=True)
            self.sdpfirmware_proxy = self.setup_proxy("STAT/SDPFirmware/HBA0")
            self.sdp_proxy = self.setup_proxy("STAT/SDP/HBA0")
            self.sst_proxy = self.setup_proxy("STAT/SST/HBA0")
            self.xst_proxy = self.setup_proxy("STAT/XST/HBA0")
            self.antennafield_proxy = self.setup_proxy(
                self.antennafield_name, cb=self.antennafield_configure
            )
            self.beamlet_proxy = self.setup_proxy("STAT/Beamlet/HBA0", defaults=True)
            self.digitalbeam_proxy = self.setup_proxy(
                "STAT/DigitalBeam/HBA0", defaults=True
            )
            self.tilebeam_proxy = self.setup_proxy("STAT/TileBeam/HBA0", defaults=True)
    
            # load settings so tests (including those in super()) can
            # use self.proxy.initialise() and self.proxy.on()
            # immediately
            self.proxy.observation_field_settings_RW = self.VALID_JSON
    
        @staticmethod
        def antennafield_configure(proxy: TestDeviceProxy):
            power_mapping = [[1, i * 2 + 0] for i in range(CS001_TILES)]
            control_mapping = [[1, i * 2 + 1] for i in range(CS001_TILES)]
            antenna_status = numpy.array([AntennaStatus.OK] * MAX_ANTENNA)
            antenna_use = numpy.array([AntennaUse.AUTO] * MAX_ANTENNA)
            proxy.put_property(
                {
                    "Power_to_RECV_mapping": numpy.array(power_mapping).flatten(),
                    "Control_to_RECV_mapping": numpy.array(control_mapping).flatten(),
                    "Antenna_to_SDP_Mapping": TestDeviceObservationField.ANTENNA_TO_SDP_MAPPING,
                    "Antenna_Status": antenna_status,
                    "Antenna_Use": antenna_use,
                    "Antenna_Sets": ["INNER", "OUTER", "SPARSE_EVEN", "SPARSE_ODD", "ALL"],
                    "Antenna_Set_Masks": [
                        "111111111111111111111111111111111111111111111111000000000000000000000000000000000000000000000000",
                        "000000000000000000000000000000000000000000000000111111111111111111111111111111111111111111111111",
                        "101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010",
                        "010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101",
                        "111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111",
                    ],
                    "HBAT_single_element_selection_GENERIC_201512": [
                        str(n % 16) for n in range(CS001_TILES)
                    ],
                }
            )
    
        def test_device_boot(self):
            """Test if we can transition off -> on using a warm boot"""
            self.proxy.observation_field_settings_RW = self.VALID_JSON
            super().test_device_boot()
    
        def test_device_initialize(self):
            """Test if we can transition to standby"""
            self.proxy.observation_field_settings_RW = self.VALID_JSON
            super().test_device_initialize()
    
        def test_device_on(self):
            """Test if we can transition off -> standby -> on"""
            self.proxy.observation_field_settings_RW = self.VALID_JSON
            super().test_device_on()
    
        def test_device_read_all_attributes(self):
            """Test if we can read all of the exposed attributes in the ON state.
    
            This test covers the reading logic of all attributes."""
            self.proxy.observation_field_settings_RW = self.VALID_JSON
            super().test_device_read_all_attributes()
    
        def test_init_valid(self):
            """Initialize an observation with valid JSON"""
    
            self.proxy.Initialise()
            self.assertEqual(DevState.STANDBY, self.proxy.state())
    
        def test_init_no_settings(self):
            """Initialize an observation with _invalid_ JSON"""
    
            # Load invalid settings
            with self.assertRaises(DevFailed):
                self.proxy.observation_field_settings_RW = "{}"
    
            # Cannot start without valid settings
            with self.assertRaises(DevFailed):
                self.proxy.Initialise()
    
            # Since initialisation did not succeed, we're still in OFF
            self.assertEqual(DevState.FAULT, self.proxy.state())
    
        def test_init_invalid(self):
            """Load an _invalid_ JSON"""
    
            # Cannot write invalid settings
            with self.assertRaises(DevFailed):
                self.proxy.observation_field_settings_RW = "{}"
    
            self.assertEqual(DevState.OFF, self.proxy.state())
    
        def test_prohibit_rewriting_settings(self):
            """Test that changing observation settings is disallowed once init"""
    
            self.proxy.Initialise()
    
            with self.assertRaises(DevFailed):
                self.proxy.write_attribute("observation_field_settings_RW", self.VALID_JSON)
    
        def test_attribute_match(self):
            """Test that JSON data is exposed to attributes"""
    
            # failing
            data = loads(self.VALID_JSON)
            stop_timestamp = datetime.fromisoformat(data["stop_time"]).timestamp()
            observation_id = data["observation_id"]
            antenna_set = data["antenna_set"]
            filter_ = data["filter"]
            saps_subband = data["SAPs"][0]["subbands"]
            pointing_direction = data["SAPs"][0]["pointing"]
            saps_pointing = [
                (
                    pointing_direction["direction_type"],
                    f"{pointing_direction['angle1']}rad",
                    f"{pointing_direction['angle2']}rad",
                )
            ] * len(data["SAPs"][0]["subbands"])
            tile_beam = [
                str(data["HBA"]["tile_beam"]["direction_type"]),
                f"{data['HBA']['tile_beam']['angle1']}rad",
                f"{data['HBA']['tile_beam']['angle2']}rad",
            ]
            first_beamlet = data["first_beamlet"]
    
            dithering = data["dithering"]["enabled"]
            dithering_power = data["dithering"]["power"]
            dithering_frequency = data["dithering"]["frequency"]
    
            dab_filter = data["HBA"]["DAB_filter"]
    
            self.proxy.Initialise()
            self.proxy.On()
    
            self.assertEqual(DevState.ON, self.proxy.state())
            self.assertEqual(stop_timestamp, self.proxy.stop_time_R)
            self.assertEqual(observation_id, self.proxy.observation_id_R)
            self.assertEqual(antenna_set, self.proxy.antenna_set_R)
            self.assertEqual(filter_, self.proxy.filter_R)
            self.assertListEqual(saps_subband, self.proxy.saps_subband_R.tolist())
            self.assertListEqual(saps_pointing, list(self.proxy.saps_pointing_R))
            self.assertEqual(dab_filter, self.proxy.HBA_DAB_filter_R)
            self.assertListEqual(tile_beam, list(self.proxy.HBA_tile_beam_R))
            self.assertEqual(first_beamlet, self.proxy.first_beamlet_R)
    
            self.assertEqual(dithering, self.proxy.dithering_enabled_R)
            self.assertEqual(dithering_power, self.proxy.dithering_power_R)
            self.assertEqual(dithering_frequency, self.proxy.dithering_frequency_R)
    
        def test_apply_antennafield_settings(self):
            """Test that attribute filter is correctly applied"""
            self.setup_proxy("STAT/StationManager/1")
    
            antennafield_proxy = self.antennafield_proxy
            antennafield_proxy.RCU_band_select_RW = [[0, 0]] * CS001_TILES
            self.assertListEqual(
                antennafield_proxy.RCU_band_select_RW.tolist(),
                [[0, 0]] * CS001_TILES,
            )
            self.proxy.Initialise()
            self.proxy.On()
            expected_bands = (
                [[2, 2]] * CS001_TILES
            )  # we request every antenna to be in this band, regardless of mask
            self.assertListEqual(
                antennafield_proxy.RCU_band_select_RW.tolist(), expected_bands
            )
    
        def test_apply_subbands(self):
            """Test that attribute sap subbands is correctly applied"""
            beamlet_proxy = self.beamlet_proxy
            subband_select = [0] * N_beamlets_ctrl
            beamlet_proxy.subband_select_RW = subband_select
            self.assertListEqual(beamlet_proxy.subband_select_RW.tolist(), subband_select)
            self.proxy.Initialise()
            self.proxy.On()
            expected_subbands = [10, 20, 30] + [0] * (N_beamlets_ctrl - 3)
            self.assertListEqual(
                beamlet_proxy.subband_select_RW.tolist(), expected_subbands
            )
    
        def test_apply_pointing(self):
            """Test that attribute sap pointing is correctly applied"""
            digitalbeam_proxy = self.digitalbeam_proxy
            default_pointing = [("AZELGEO", "0rad", "1.570796rad")] * N_beamlets_ctrl
            digitalbeam_proxy.Pointing_direction_RW = default_pointing
            self.assertListEqual(
                list(digitalbeam_proxy.Pointing_direction_RW), default_pointing
            )
            self.proxy.Initialise()
            self.proxy.On()
    
            expected_pointing = [("J2000", "0.0261799rad", "0rad")] * 3 + [
                ("AZELGEO", "0rad", "1.570796rad")
            ] * (N_beamlets_ctrl - 3)
    
            digitalbeam_pointing = list(digitalbeam_proxy.Pointing_direction_RW[0])
            digitalbeam_pointing[1] = digitalbeam_pointing[1][:9] + "rad"
            digitalbeam_pointing = [tuple(digitalbeam_pointing)] * 3 + [
                ("AZELGEO", "0rad", "1.570796rad")
            ] * (N_beamlets_ctrl - 3)
    
            self.maxDiff = None
    
            self.assertListEqual(
                digitalbeam_pointing,
                expected_pointing,
            )
    
        def test_apply_tilebeam(self):
            # failing
            """Test that attribute tilebeam is correctly applied"""
            tilebeam_proxy = self.tilebeam_proxy
            pointing_direction = [("J2000", "0rad", "0rad")] * CS001_TILES
            tilebeam_proxy.Pointing_direction_RW = pointing_direction
            self.assertListEqual(
                list(tilebeam_proxy.Pointing_direction_RW[0]), ["J2000", "0rad", "0rad"]
            )
            self.proxy.Initialise()
            self.proxy.On()
    
            tilebeam_directions = list(tilebeam_proxy.Pointing_direction_RW[0])
            tilebeam_directions[1] = tilebeam_directions[1][:9] + "rad"
    
            self.assertListEqual(
                tilebeam_directions,
                ["J2000", "0.0261799rad", "0rad"],
            )
    
        def test_apply_element_selection(self):
            # failing
            """Test that attribute element_selection is correctly applied"""
            antennafield_proxy = self.antennafield_proxy
            antennafield_proxy.HBAT_PWR_on_RW = numpy.ones(
                (CS001_TILES, N_elements * N_pol), dtype=bool
            )
            antennafield_proxy.HBAT_PWR_LNA_on_RW = numpy.zeros(
                (CS001_TILES, N_elements * N_pol), dtype=bool
            )
    
            self.proxy.Initialise()
            self.proxy.On()
    
            # construct an array with "True" at the expected spots
            expected = numpy.zeros((CS001_TILES, N_elements, N_pol), dtype=bool)
            for n in range(CS001_TILES):
                expected[n, n % 16, :] = True
    
            # collapse all values for a tile into a single dimension,
            # like HBAT_PWR_(LNA_)on_RW will return
            expected = expected.reshape(CS001_TILES, -1)
    
            self.assertListEqual(
                antennafield_proxy.HBAT_PWR_on_RW.tolist(), expected.tolist()
            )
            self.assertListEqual(
                antennafield_proxy.HBAT_PWR_LNA_on_RW.tolist(), expected.tolist()
            )
    
        def test_xst_settings(self):
            """Test that the XSTs are correctly configured"""
    
            # reset XST configuration to detect changes
            xst_proxy = self.xst_proxy
            xst_proxy.FPGA_xst_subband_select_RW = [
                [0] * (1 + MAX_PARALLEL_SUBBANDS)
            ] * N_pn
            xst_proxy.FPGA_xst_integration_interval_RW = [10.0] * N_pn
            xst_proxy.FPGA_xst_offload_nof_crosslets_RW = [0] * N_pn
    
            self.proxy.Initialise()
            self.proxy.On()
    
            self.assertListEqual(
                [5, 1, 2, 3, 4, 5, 0, 0] * N_pn,
                xst_proxy.FPGA_xst_subband_select_RW.flatten().tolist(),
            )
            self.assertListEqual(
                [1.0] * N_pn, xst_proxy.FPGA_xst_integration_interval_RW.tolist()
            )
            self.assertListEqual(
                [5] * N_pn, xst_proxy.FPGA_xst_offload_nof_crosslets_RW.tolist()
            )