diff --git a/tangostationcontrol/tangostationcontrol/devices/antennafield.py b/tangostationcontrol/tangostationcontrol/devices/antennafield.py index 22e5f922fbc3853bf29b865cc075b206116e89e2..4c39a6bdea1db44f52c87f2fd3a4e6add71e3d99 100644 --- a/tangostationcontrol/tangostationcontrol/devices/antennafield.py +++ b/tangostationcontrol/tangostationcontrol/devices/antennafield.py @@ -6,11 +6,15 @@ """ AntennaField Device Server for LOFAR2.0 """ +from enum import IntEnum +from math import pi +import numpy + +# PyTango imports from tango import DeviceProxy, DevSource, AttrWriteType, DevVarFloatArray, DevVarLongArray from tango.server import device_property, attribute, command -import numpy -from math import pi +# Additional import from tangostationcontrol.common.entrypoint import entry from tangostationcontrol.devices.lofar_device import lofar_device from tangostationcontrol.common.lofar_logging import device_logging_to_python, log_exceptions @@ -26,6 +30,17 @@ __all__ = ["AntennaField", "HBATToRecvMapper", "main"] # Highest number of HBA tiles we support per AntennaField MAX_NUMBER_OF_HBAT = 96 +class AntennaUse(IntEnum): + AUTO = 0 # use antenna only if its OK or SUSPICIOUS + ON = 1 # force antenna to be on, regardless of quality + OFF = 2 # force antenna to be off, regardless of quality + +class AntennaQuality(IntEnum): + OK = 0 + SUSPICIOUS = 1 + BROKEN = 2 + BEYOND_REPAIR = 3 + class mapped_attribute(attribute): def __init__(self, mapping_attribute, dtype, max_dim_x, max_dim_y=0, access=AttrWriteType.READ, **kwargs): @@ -70,6 +85,22 @@ class AntennaField(lofar_device): calculated, as well as the geohash. """ + # ----- Antenna states + + Antenna_Quality = device_property( + doc="Operational quality state of each antenna", + dtype='DevVarUShortArray', + mandatory=False, + default_value = numpy.array([AntennaQuality.OK] * MAX_NUMBER_OF_HBAT) + ) + + Antenna_Use = device_property( + doc="Operational State of each antenna", + dtype='DevVarUShortArray', + mandatory=False, + default_value = numpy.array([AntennaUse.AUTO] * MAX_NUMBER_OF_HBAT) + ) + # ----- Position information Antenna_Field_Reference_ITRF = device_property( @@ -164,6 +195,13 @@ class AntennaField(lofar_device): default_value = [] ) + Antenna_Quality_R = attribute(access=AttrWriteType.READ, + dtype=(numpy.uint16,), max_dim_x=MAX_NUMBER_OF_HBAT) + Antenna_Use_R = attribute(access=AttrWriteType.READ, + dtype=(numpy.uint16,), max_dim_x=MAX_NUMBER_OF_HBAT) + Antenna_Usage_Mask_R = attribute(access=AttrWriteType.READ, + dtype=(bool,), max_dim_x=MAX_NUMBER_OF_HBAT) + HBAT_ANT_mask_RW = mapped_attribute("ANT_mask_RW", dtype=(bool,), max_dim_x=MAX_NUMBER_OF_HBAT, access=AttrWriteType.READ_WRITE) HBAT_BF_delay_steps_R = mapped_attribute("HBAT_BF_delay_steps_R", dtype=((numpy.int64,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2, max_dim_y=MAX_NUMBER_OF_HBAT) HBAT_BF_delay_steps_RW = mapped_attribute("HBAT_BF_delay_steps_RW", dtype=((numpy.int64,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2, max_dim_y=MAX_NUMBER_OF_HBAT, access=AttrWriteType.READ_WRITE) @@ -209,6 +247,19 @@ class AntennaField(lofar_device): doc='Number of HBAT in this field', dtype=numpy.int32) + def read_Antenna_Use_R(self): + return self.Antenna_Use + + def read_Antenna_Quality_R(self): + return self.Antenna_Quality + + def read_Antenna_Usage_Mask_R(self): + antenna_usage = numpy.zeros(MAX_NUMBER_OF_HBAT, dtype=bool) + for n in range(0, MAX_NUMBER_OF_HBAT): + antenna_usage[n] = (self.read_attribute('Antenna_Use_R')[n] == AntennaUse.ON + or (self.read_attribute('Antenna_Use_R')[n] == AntennaUse.AUTO and self.read_attribute('Antenna_Quality_R')[n] <= AntennaQuality.SUSPICIOUS)) + return antenna_usage + def read_nr_tiles_R(self): # The number of tiles should be equal to: # * the number of elements in the HBAT_Control_to_RECV_mapping (after reshaping), @@ -278,12 +329,6 @@ class AntennaField(lofar_device): def read_HBAT_reference_GEOHASH_R(self): return GEO_to_GEOHASH(self.read_HBAT_reference_GEO_R()) - @log_exceptions() - def configure_for_initialise(self): - super().configure_for_initialise() - self.__setup_all_receiver_proxies() - self.__setup_mapper() - def __setup_all_receiver_proxies(self): self.recv_proxies = [] @@ -317,6 +362,43 @@ class AntennaField(lofar_device): for idx, recv_proxy in enumerate(self.recv_proxies): recv_proxy.write_attribute(mapped_point, mapped_value[idx]) + # -------- + # Overloaded functions + # -------- + + @log_exceptions() + def configure_for_initialise(self): + super().configure_for_initialise() + self.__setup_all_receiver_proxies() + self.__setup_mapper() + + @log_exceptions() + def _prepare_hardware(self): + # Initialise the RCU hardware. + for recv_proxy in self.recv_proxies: + RCU_mask = recv_proxy.RCU_mask_RW + # Set the mask to all Trues + recv_proxy.RCU_mask_RW = [True] * 32 + # Turn off the RCUs + recv_proxy.RCU_off() + + # TODO(Stefano): restore wait attribute + #recv_proxy.wait_attribute("RECVTR_translator_busy_R", False, recv_proxy.RCU_On_Off_timeout) + + # Restore the mask + recv_proxy.RCU_mask_RW = RCU_mask + # Turn on the RCUs + recv_proxy.RCU_on() + + # TODO(Stefano): restore wait attribute + #recv_proxy.wait_attribute("RECVTR_translator_busy_R", False, recv_proxy.RCU_On_Off_timeout) + + @log_exceptions() + def _initialise_hardware(self): + # Disable controlling the tiles that fall outside the mask + # WARN: Needed in configure_for_initialise but Tango does not allow to write attributes in INIT state + self.proxy.write_attribute('HBAT_ANT_mask_RW', self.read_attribute('Antenna_Usage_Mask_R')) + # -------- # Commands # -------- diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py b/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py index 0b44a14292c9e2f0e053001a6e071adcd977f978..256d156a4337bffb46e2ed085cbcc151d6d4f5b2 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/digitalbeam.py @@ -99,7 +99,7 @@ class DigitalBeam(beam_device): def write_antenna_select_RW(self, antennas): for input_nr, antenna_nr in enumerate(self.Input_to_Antenna_Mapping): - if antenna_nr >= 0: + if self.antennafield_proxy.Antenna_Usage_Mask_R[input_nr] and antenna_nr >= 0: self._input_select[input_nr] = antennas[antenna_nr] # ---------- @@ -120,6 +120,7 @@ class DigitalBeam(beam_device): self.antennafield_proxy = DeviceProxy(self.AntennaField_Device) self.antennafield_proxy.set_source(DevSource.DEV) + antenna_usage_mask = self.antennafield_proxy.Antenna_Usage_Mask_R self.beamlet_proxy = DeviceProxy(self.Beamlet_Device) self.beamlet_proxy.set_source(DevSource.DEV) @@ -132,7 +133,7 @@ class DigitalBeam(beam_device): # Use reference position for any missing antennas so they always get a delay of 0 input_itrf = numpy.array([reference_itrf] * self.NUM_INPUTS) for input_nr, antenna_nr in enumerate(self.Input_to_Antenna_Mapping): - if antenna_nr >= 0: + if antenna_usage_mask[input_nr] and antenna_nr >= 0: input_itrf[input_nr] = antenna_itrf[antenna_nr] # a delay calculator @@ -142,7 +143,8 @@ class DigitalBeam(beam_device): self.relative_input_positions = input_itrf - reference_itrf # use all antennas in the mapping for all beamlets, unless specified otherwise - input_select = numpy.repeat((numpy.array(self.Input_to_Antenna_Mapping) >= 0), self.NUM_BEAMLETS).reshape(self.NUM_INPUTS, self.NUM_BEAMLETS) + input_select_mask = numpy.logical_and(numpy.array(self.Input_to_Antenna_Mapping) >= 0, antenna_usage_mask) + input_select = numpy.repeat(input_select_mask, self.NUM_BEAMLETS).reshape(self.NUM_INPUTS, self.NUM_BEAMLETS) self.write_input_select_RW(input_select) # -------- diff --git a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_antennafield.py b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_antennafield.py index 2b34767d6c74df2199017e1a884a0e2165af7fb0..f14a2838d489b3f40a03186d565b56d75190f950 100644 --- a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_antennafield.py +++ b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_antennafield.py @@ -7,8 +7,10 @@ # Distributed under the terms of the APACHE license. # See LICENSE.txt for more info. -from tangostationcontrol.integration_test.device_proxy import TestDeviceProxy +import numpy +from tangostationcontrol.integration_test.device_proxy import TestDeviceProxy +from tangostationcontrol.devices.antennafield import AntennaQuality, AntennaUse from .base import AbstractTestBases class TestAntennaFieldDevice(AbstractTestBases.TestDeviceBase): @@ -30,3 +32,41 @@ class TestAntennaFieldDevice(AbstractTestBases.TestDeviceBase): def test_property_recv_devices_has_one_receiver(self): result = self.proxy.get_property("RECV_devices") self.assertSequenceEqual(result["RECV_devices"], ["STAT/RECV/1"]) + + def test_HBAT_ANT_mask_RW_configured_after_Antenna_Usage_Mask(self): + """ Verify if HBAT_ANT_mask_RW values are correctly configured from Antenna_Usage_Mask values """ + recv_proxy = self.setup_recv_proxy() + antennafield_proxy = self.proxy + numpy.testing.assert_equal(numpy.array([True] * 96), recv_proxy.ANT_mask_RW) + + antenna_qualities = numpy.array([AntennaQuality.OK] * 96) + antenna_use = numpy.array([AntennaUse.ON] + [AntennaUse.AUTO] * 95) + antenna_properties = {'Antenna_Quality': antenna_qualities, 'Antenna_Use': antenna_use} + mapping_properties = {"RECV_devices": ["STAT/RECV/1"], + "HBAT_Power_to_RECV_mapping": [-1, -1] * 48, + "HBAT_Control_to_RECV_mapping": [1, 0 , 1, 1] + [-1, -1] * 46} + antennafield_proxy.off() + antennafield_proxy.put_property(antenna_properties) + antennafield_proxy.put_property(mapping_properties) + antennafield_proxy.boot() # initialise hardware values as well + numpy.testing.assert_equal(numpy.array([True] * 96), antennafield_proxy.Antenna_Usage_Mask_R) + numpy.testing.assert_equal(numpy.array([True] * 2 + [False] * 46), antennafield_proxy.HBAT_ANT_mask_RW) + numpy.testing.assert_equal(numpy.array([True] * 2 + [False] * 46 + [False] * 48), recv_proxy.ANT_mask_RW) + + def test_HBAT_ANT_mask_RW_configured_after_Antenna_Usage_Mask_only_one_functioning_antenna(self): + """ Verify if HBAT_ANT_mask_RW values are correctly configured from Antenna_Usage_Mask values (only second antenna is OK)""" + recv_proxy = self.setup_recv_proxy() + antennafield_proxy = self.proxy + antenna_qualities = numpy.array([AntennaQuality.BROKEN] + [AntennaQuality.OK] + [AntennaQuality.BROKEN] * 94) + antenna_use = numpy.array([AntennaUse.AUTO] * 96) + antenna_properties = {'Antenna_Quality': antenna_qualities, 'Antenna_Use': antenna_use} + mapping_properties = {"RECV_devices": ["STAT/RECV/1"], + "HBAT_Power_to_RECV_mapping": [-1, -1] * 48, + "HBAT_Control_to_RECV_mapping": [1, 0 , 1, 1] + [-1, -1] * 46} + antennafield_proxy.off() + antennafield_proxy.put_property(antenna_properties) + antennafield_proxy.put_property(mapping_properties) + antennafield_proxy.boot() # initialise hardware values as well + numpy.testing.assert_equal(numpy.array([False] + [True] + [False] * 94), antennafield_proxy.Antenna_Usage_Mask_R) + numpy.testing.assert_equal(numpy.array([False] + [True] + [False] * 46), antennafield_proxy.HBAT_ANT_mask_RW) + numpy.testing.assert_equal(numpy.array([False] + [True] + [False] * 46 + [False] * 48), recv_proxy.ANT_mask_RW) diff --git a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_digitalbeam.py b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_digitalbeam.py index c62076477d0596d2fbf9993b9e81db69a15c12d7..f8857a0f52cb9c9e84394863b52c41e0cbf461b3 100644 --- a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_digitalbeam.py +++ b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_digitalbeam.py @@ -9,6 +9,7 @@ # See LICENSE.txt for more info. from tangostationcontrol.integration_test.device_proxy import TestDeviceProxy +from tangostationcontrol.devices.antennafield import AntennaQuality, AntennaUse from .base import AbstractTestBases @@ -16,6 +17,10 @@ import numpy class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): + antenna_qualities_ok = numpy.array([AntennaQuality.OK] * 96) + antenna_qualities_only_second = numpy.array([AntennaQuality.BROKEN] + [AntennaQuality.OK] + [AntennaQuality.BROKEN] * 94) + antenna_use_ok = numpy.array([AntennaUse.AUTO] * 96) + def setUp(self): """Intentionally recreate the device object in each test""" super().setUp("STAT/DigitalBeam/1") @@ -44,8 +49,21 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): sdp_proxy.warm_boot() sdp_proxy.set_defaults() return sdp_proxy + + def setup_antennafield_proxy(self, antenna_qualities, antenna_use): + # setup AntennaField + NR_TILES = 48 + antennafield_proxy = TestDeviceProxy("STAT/AntennaField/1") + control_mapping = [[1,i] for i in range(NR_TILES)] + antennafield_proxy.put_property({"RECV_devices": ["STAT/RECV/1"], + "HBAT_Control_to_RECV_mapping": numpy.array(control_mapping).flatten(), + 'Antenna_Quality': antenna_qualities, 'Antenna_Use': antenna_use}) + antennafield_proxy.off() + antennafield_proxy.boot() + return antennafield_proxy def test_pointing_to_zenith(self): + self.setup_antennafield_proxy(self.antenna_qualities_ok, self.antenna_use_ok) self.setup_sdp_proxy() self.setup_recv_proxy() # Setup beamlet configuration @@ -61,3 +79,25 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): # beam weights should now be non-zero, we don't actually check their values for correctness self.assertNotEqual(0, sum(self.beamlet_proxy.FPGA_bf_weights_xx_yy_RW.flatten())) + + def test_input_select_with_all_antennas_ok(self): + """ Verify if input and antenna select are correctly calculated following Antennafield.Antenna_Usage_Mask """ + antennafield_proxy = self.setup_antennafield_proxy(self.antenna_qualities_ok, self.antenna_use_ok) + numpy.testing.assert_equal(numpy.array([True] * 96), antennafield_proxy.Antenna_Usage_Mask_R) + self.setUp() + self.proxy.warm_boot() + expected_input_select = numpy.array([[True] * 488 ] * 48 + [[False] * 488] * 48) # first 48 rows are True + numpy.testing.assert_equal(expected_input_select, self.proxy.input_select_RW) + expected_antenna_select = numpy.array([[True] * 488 ] * 48) + numpy.testing.assert_equal(expected_antenna_select, self.proxy.antenna_select_RW) + + def test_input_select_with_only_second_antenna_ok(self): + """ Verify if input and antenna select are correctly calculated following Antennafield.Antenna_Usage_Mask """ + antennafield_proxy = self.setup_antennafield_proxy(self.antenna_qualities_only_second, self.antenna_use_ok) + numpy.testing.assert_equal(numpy.array([False] + [True] + [False] * 94), antennafield_proxy.Antenna_Usage_Mask_R) + self.setUp() + self.proxy.warm_boot() + expected_input_select = numpy.array([[False] * 488 ] + [[True] * 488] + [[False] * 488] * 46 + [[False] * 488] * 48) # first 48 rows are True + numpy.testing.assert_equal(expected_input_select, self.proxy.input_select_RW) + expected_antenna_select = numpy.array([[False] * 488 ] + [[True] * 488] + [[False] * 488] * 46) + numpy.testing.assert_equal(expected_antenna_select, self.proxy.antenna_select_RW) diff --git a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_observation.py b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_observation.py index ce09175933edc18baf555ba091bb8b72ebdb1781..d3f70dd07cf441be3f6deb75ce8358c861987194 100644 --- a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_observation.py +++ b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_observation.py @@ -14,6 +14,7 @@ from tango import DevState, DevFailed from tangostationcontrol.integration_test.device_proxy import TestDeviceProxy from tangostationcontrol.test.devices.test_observation_base import TestObservationBase +from tangostationcontrol.devices.antennafield import AntennaQuality, AntennaUse from .base import AbstractTestBases class TestDeviceObservation(AbstractTestBases.TestDeviceBase): @@ -69,11 +70,13 @@ class TestDeviceObservation(AbstractTestBases.TestDeviceBase): # setup AntennaField antennafield_proxy = TestDeviceProxy("STAT/AntennaField/1") control_mapping = [[1,i] for i in range(self.NUM_TILES)] + antenna_qualities = numpy.array([AntennaQuality.OK] * 96) + antenna_use = numpy.array([AntennaUse.AUTO] * 96) antennafield_proxy.put_property({"RECV_devices": ["STAT/RECV/1"], - "HBAT_Power_to_RECV_mapping": numpy.array(control_mapping).flatten()}) + "HBAT_Control_to_RECV_mapping": numpy.array(control_mapping).flatten(), + 'Antenna_Quality': antenna_qualities, 'Antenna_Use': antenna_use}) antennafield_proxy.off() - antennafield_proxy.warm_boot() - antennafield_proxy.set_defaults() + antennafield_proxy.boot() return antennafield_proxy def setup_beamlet_proxy(self): diff --git a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_tilebeam.py b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_tilebeam.py index 32dfa8e322100fa16a86c1ab7d8610f617e315ed..991e03543e70c8b308319e45766f797e530a71c6 100644 --- a/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_tilebeam.py +++ b/tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_tilebeam.py @@ -13,6 +13,7 @@ import datetime import json from tangostationcontrol.integration_test.device_proxy import TestDeviceProxy +from tangostationcontrol.devices.antennafield import AntennaQuality, AntennaUse from .base import AbstractTestBases @@ -45,11 +46,13 @@ class TestDeviceTileBeam(AbstractTestBases.TestDeviceBase): # setup AntennaField antennafield_proxy = TestDeviceProxy("STAT/AntennaField/1") control_mapping = [[1,i] for i in range(self.NR_TILES)] + antenna_qualities = numpy.array([AntennaQuality.OK] * 96) + antenna_use = numpy.array([AntennaUse.AUTO] * 96) antennafield_proxy.put_property({"RECV_devices": ["STAT/RECV/1"], - "HBAT_Power_to_RECV_mapping": numpy.array(control_mapping).flatten()}) + "HBAT_Control_to_RECV_mapping": numpy.array(control_mapping).flatten(), + 'Antenna_Quality': antenna_qualities, 'Antenna_Use': antenna_use}) antennafield_proxy.off() - antennafield_proxy.warm_boot() - antennafield_proxy.set_defaults() + antennafield_proxy.boot() # check if AntennaField really exposes the expected number of tiles self.assertEqual(self.NR_TILES, antennafield_proxy.nr_tiles_R) diff --git a/tangostationcontrol/tangostationcontrol/test/devices/test_antennafield_device.py b/tangostationcontrol/tangostationcontrol/test/devices/test_antennafield_device.py index eca4686a8c65aec51980734383eb80ee3ce1fc15..96eb83300bec3352b1c0572653f38bf96a2e77d9 100644 --- a/tangostationcontrol/tangostationcontrol/test/devices/test_antennafield_device.py +++ b/tangostationcontrol/tangostationcontrol/test/devices/test_antennafield_device.py @@ -12,7 +12,7 @@ import numpy from tango.test_context import DeviceTestContext from tangostationcontrol.devices import antennafield -from tangostationcontrol.devices.antennafield import HBATToRecvMapper +from tangostationcontrol.devices.antennafield import HBATToRecvMapper, AntennaQuality, AntennaUse from tangostationcontrol.test import base from tangostationcontrol.test.devices import device_base @@ -340,6 +340,14 @@ class TestAntennafieldDevice(device_base.DeviceTestCase): AT_PROPERTIES = {'OPC_Server_Name': 'example.com', 'OPC_Server_Port': 4840, 'OPC_Time_Out': 5.0, 'Antenna_Field_Reference_ITRF' : [3.0, 3.0, 3.0], 'Antenna_Field_Reference_ETRS' : [7.0, 7.0, 7.0]} + # A mapping where HBATs are all not mapped to power RCUs + POWER_NOT_CONNECTED = [[-1, -1]] * 48 + # A mapping where HBATs are all not mapped to control RCUs + CONTROL_NOT_CONNECTED = [[-1, -1]] * 48 + # A mapping where first two HBATs are mapped on the first Receiver. + # The first HBAT control line on RCU 1 and the second HBAT control line on RCU 0. + CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1 = [[1, 1], [1, 0]] + [[-1, -1]] * 46 + def setUp(self): # DeviceTestCase setUp patches lofar_device DeviceProxy super(TestAntennafieldDevice, self).setUp() @@ -353,4 +361,31 @@ class TestAntennafieldDevice(device_base.DeviceTestCase): at_properties_v2 = {'OPC_Server_Name': 'example.com', 'OPC_Server_Port': 4840, 'OPC_Time_Out': 5.0, 'Antenna_Field_Reference_ETRS' : [7.0, 7.0, 7.0]} with DeviceTestContext(antennafield.AntennaField, properties=at_properties_v2, process=True) as proxy: self.assertNotEqual(3.0, proxy.Antenna_Field_Reference_ITRF_R[0]) # value = 6.948998835785814 - + + def test_read_Antenna_Quality(self): + """ Verify if Antenna_Quality_R is correctly retrieved """ + antenna_qualities = numpy.array([AntennaQuality.OK] * 96) + with DeviceTestContext(antennafield.AntennaField, properties=self.AT_PROPERTIES, process=True) as proxy: + numpy.testing.assert_equal(antenna_qualities, proxy.Antenna_Quality_R) + + def test_read_Antenna_Use(self): + """ Verify if Antenna_Use_R is correctly retrieved """ + antenna_use = numpy.array([AntennaUse.AUTO] * 96) + with DeviceTestContext(antennafield.AntennaField, properties=self.AT_PROPERTIES, process=True) as proxy: + numpy.testing.assert_equal(antenna_use, proxy.Antenna_Use_R) + + def test_read_Antenna_Usage_Mask(self): + """ Verify if Antenna_Usage_Mask_R is correctly retrieved """ + antenna_qualities = numpy.array([AntennaQuality.OK] * 96) + antenna_use = numpy.array([AntennaUse.ON] + [AntennaUse.AUTO] * 95) + antenna_properties = {'Antenna_Quality': antenna_qualities, 'Antenna_Use': antenna_use} + with DeviceTestContext(antennafield.AntennaField, properties={**self.AT_PROPERTIES, **antenna_properties}, process=True) as proxy: + numpy.testing.assert_equal(numpy.array([True] * 96), proxy.Antenna_Usage_Mask_R) + + def test_read_Antenna_Usage_Mask_only_one_functioning_antenna(self): + """ Verify if Antenna_Usage_Mask_R (only first antenna is OK) is correctly retrieved """ + antenna_qualities = numpy.array([AntennaQuality.OK] + [AntennaQuality.BROKEN] * 95) + antenna_use = numpy.array([AntennaUse.ON] + [AntennaUse.AUTO] * 95) + antenna_properties = {'Antenna_Quality': antenna_qualities, 'Antenna_Use': antenna_use} + with DeviceTestContext(antennafield.AntennaField, properties={**self.AT_PROPERTIES, **antenna_properties}, process=True) as proxy: + numpy.testing.assert_equal(numpy.array([True] + [False] * 95), proxy.Antenna_Usage_Mask_R)