Skip to content
Snippets Groups Projects
Commit c8b7e854 authored by Stefano Di Frischia's avatar Stefano Di Frischia
Browse files

L2SS-827: update input select in digitalbeam following antenna states

parent ae8b6018
No related branches found
No related tags found
1 merge request!389Resolve L2SS-827 "Implement antenna state"
...@@ -326,18 +326,6 @@ class AntennaField(lofar_device): ...@@ -326,18 +326,6 @@ class AntennaField(lofar_device):
def read_HBAT_reference_GEOHASH_R(self): def read_HBAT_reference_GEOHASH_R(self):
return GEO_to_GEOHASH(self.read_HBAT_reference_GEO_R()) return GEO_to_GEOHASH(self.read_HBAT_reference_GEO_R())
@log_exceptions()
def configure_for_initialise(self):
super().configure_for_initialise()
self.__setup_all_receiver_proxies()
self.__setup_mapper()
@log_exceptions()
def _initialise_hardware(self):
# Disable controlling the tiles that fall outside the mask
# WARN: Needed in configure_for_initialise but Tango does not allow to write attributes in INIT state
self.proxy.write_attribute('HBAT_ANT_mask_RW', self.read_attribute('Antenna_Usage_Mask_R'))
def __setup_all_receiver_proxies(self): def __setup_all_receiver_proxies(self):
self.recv_proxies = [] self.recv_proxies = []
...@@ -371,6 +359,22 @@ class AntennaField(lofar_device): ...@@ -371,6 +359,22 @@ class AntennaField(lofar_device):
for idx, recv_proxy in enumerate(self.recv_proxies): for idx, recv_proxy in enumerate(self.recv_proxies):
recv_proxy.write_attribute(mapped_point, mapped_value[idx]) recv_proxy.write_attribute(mapped_point, mapped_value[idx])
# --------
# Overloaded functions
# --------
@log_exceptions()
def configure_for_initialise(self):
super().configure_for_initialise()
self.__setup_all_receiver_proxies()
self.__setup_mapper()
@log_exceptions()
def _initialise_hardware(self):
# Disable controlling the tiles that fall outside the mask
# WARN: Needed in configure_for_initialise but Tango does not allow to write attributes in INIT state
self.proxy.write_attribute('HBAT_ANT_mask_RW', self.read_attribute('Antenna_Usage_Mask_R'))
# -------- # --------
# Commands # Commands
# -------- # --------
......
...@@ -99,7 +99,7 @@ class DigitalBeam(beam_device): ...@@ -99,7 +99,7 @@ class DigitalBeam(beam_device):
def write_antenna_select_RW(self, antennas): def write_antenna_select_RW(self, antennas):
for input_nr, antenna_nr in enumerate(self.Input_to_Antenna_Mapping): for input_nr, antenna_nr in enumerate(self.Input_to_Antenna_Mapping):
if antenna_nr >= 0: if self.antennafield_proxy.Antenna_Usage_Mask_R[input_nr] and antenna_nr >= 0:
self._input_select[input_nr] = antennas[antenna_nr] self._input_select[input_nr] = antennas[antenna_nr]
# ---------- # ----------
...@@ -120,6 +120,7 @@ class DigitalBeam(beam_device): ...@@ -120,6 +120,7 @@ class DigitalBeam(beam_device):
self.antennafield_proxy = DeviceProxy(self.AntennaField_Device) self.antennafield_proxy = DeviceProxy(self.AntennaField_Device)
self.antennafield_proxy.set_source(DevSource.DEV) self.antennafield_proxy.set_source(DevSource.DEV)
antenna_usage_mask = self.antennafield_proxy.Antenna_Usage_Mask_R
self.beamlet_proxy = DeviceProxy(self.Beamlet_Device) self.beamlet_proxy = DeviceProxy(self.Beamlet_Device)
self.beamlet_proxy.set_source(DevSource.DEV) self.beamlet_proxy.set_source(DevSource.DEV)
...@@ -132,7 +133,7 @@ class DigitalBeam(beam_device): ...@@ -132,7 +133,7 @@ class DigitalBeam(beam_device):
# Use reference position for any missing antennas so they always get a delay of 0 # Use reference position for any missing antennas so they always get a delay of 0
input_itrf = numpy.array([reference_itrf] * self.NUM_INPUTS) input_itrf = numpy.array([reference_itrf] * self.NUM_INPUTS)
for input_nr, antenna_nr in enumerate(self.Input_to_Antenna_Mapping): for input_nr, antenna_nr in enumerate(self.Input_to_Antenna_Mapping):
if antenna_nr >= 0: if antenna_usage_mask[input_nr] and antenna_nr >= 0:
input_itrf[input_nr] = antenna_itrf[antenna_nr] input_itrf[input_nr] = antenna_itrf[antenna_nr]
# a delay calculator # a delay calculator
...@@ -142,7 +143,8 @@ class DigitalBeam(beam_device): ...@@ -142,7 +143,8 @@ class DigitalBeam(beam_device):
self.relative_input_positions = input_itrf - reference_itrf self.relative_input_positions = input_itrf - reference_itrf
# use all antennas in the mapping for all beamlets, unless specified otherwise # use all antennas in the mapping for all beamlets, unless specified otherwise
input_select = numpy.repeat((numpy.array(self.Input_to_Antenna_Mapping) >= 0), self.NUM_BEAMLETS).reshape(self.NUM_INPUTS, self.NUM_BEAMLETS) input_select_mask = numpy.logical_and(numpy.array(self.Input_to_Antenna_Mapping) >= 0, antenna_usage_mask)
input_select = numpy.repeat(input_select_mask, self.NUM_BEAMLETS).reshape(self.NUM_INPUTS, self.NUM_BEAMLETS)
self.write_input_select_RW(input_select) self.write_input_select_RW(input_select)
# -------- # --------
......
...@@ -9,6 +9,7 @@ ...@@ -9,6 +9,7 @@
# See LICENSE.txt for more info. # See LICENSE.txt for more info.
from tangostationcontrol.integration_test.device_proxy import TestDeviceProxy from tangostationcontrol.integration_test.device_proxy import TestDeviceProxy
from tangostationcontrol.devices.antennafield import AntennaQuality, AntennaUse
from .base import AbstractTestBases from .base import AbstractTestBases
...@@ -16,6 +17,10 @@ import numpy ...@@ -16,6 +17,10 @@ import numpy
class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase):
antenna_qualities_ok = numpy.array([AntennaQuality.OK] * 96)
antenna_qualities_only_second = numpy.array([AntennaQuality.BROKEN] + [AntennaQuality.OK] + [AntennaQuality.BROKEN] * 94)
antenna_use_ok = numpy.array([AntennaUse.AUTO] * 96)
def setUp(self): def setUp(self):
"""Intentionally recreate the device object in each test""" """Intentionally recreate the device object in each test"""
super().setUp("STAT/DigitalBeam/1") super().setUp("STAT/DigitalBeam/1")
...@@ -37,7 +42,30 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): ...@@ -37,7 +42,30 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase):
beamlet_proxy.set_defaults() beamlet_proxy.set_defaults()
return beamlet_proxy return beamlet_proxy
def setup_sdp_proxy(self):
# setup SDP, on which this device depends
sdp_proxy = TestDeviceProxy("STAT/SDP/1")
sdp_proxy.off()
sdp_proxy.warm_boot()
sdp_proxy.set_defaults()
return sdp_proxy
def setup_antennafield_proxy(self, antenna_qualities, antenna_use):
# setup AntennaField
NR_TILES = 48
antennafield_proxy = TestDeviceProxy("STAT/AntennaField/1")
control_mapping = [[1,i] for i in range(NR_TILES)]
antennafield_proxy.put_property({"RECV_devices": ["STAT/RECV/1"],
"HBAT_Control_to_RECV_mapping": numpy.array(control_mapping).flatten(),
'Antenna_Quality': antenna_qualities, 'Antenna_Use': antenna_use})
antennafield_proxy.off()
antennafield_proxy.boot()
return antennafield_proxy
def test_pointing_to_zenith(self): def test_pointing_to_zenith(self):
self.setup_antennafield_proxy(self.antenna_qualities_ok, self.antenna_use_ok)
self.setup_sdp_proxy()
self.setup_recv_proxy()
# Setup beamlet configuration # Setup beamlet configuration
self.beamlet_proxy.clock_RW = 200 * 1000000 self.beamlet_proxy.clock_RW = 200 * 1000000
self.beamlet_proxy.subband_select = list(range(488)) self.beamlet_proxy.subband_select = list(range(488))
...@@ -51,3 +79,25 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase): ...@@ -51,3 +79,25 @@ class TestDeviceDigitalBeam(AbstractTestBases.TestDeviceBase):
# beam weights should now be non-zero, we don't actually check their values for correctness # beam weights should now be non-zero, we don't actually check their values for correctness
self.assertNotEqual(0, sum(self.beamlet_proxy.FPGA_bf_weights_xx_yy_RW.flatten())) self.assertNotEqual(0, sum(self.beamlet_proxy.FPGA_bf_weights_xx_yy_RW.flatten()))
def test_input_select_with_all_antennas_ok(self):
""" Verify if input and antenna select are correctly calculated following Antennafield.Antenna_Usage_Mask """
antennafield_proxy = self.setup_antennafield_proxy(self.antenna_qualities_ok, self.antenna_use_ok)
numpy.testing.assert_equal(numpy.array([True] * 96), antennafield_proxy.Antenna_Usage_Mask_R)
self.setUp()
self.proxy.warm_boot()
expected_input_select = numpy.array([[True] * 488 ] * 48 + [[False] * 488] * 48) # first 48 rows are True
numpy.testing.assert_equal(expected_input_select, self.proxy.input_select_RW)
expected_antenna_select = numpy.array([[True] * 488 ] * 48)
numpy.testing.assert_equal(expected_antenna_select, self.proxy.antenna_select_RW)
def test_input_select_with_only_second_antenna_ok(self):
""" Verify if input and antenna select are correctly calculated following Antennafield.Antenna_Usage_Mask """
antennafield_proxy = self.setup_antennafield_proxy(self.antenna_qualities_only_second, self.antenna_use_ok)
numpy.testing.assert_equal(numpy.array([False] + [True] + [False] * 94), antennafield_proxy.Antenna_Usage_Mask_R)
self.setUp()
self.proxy.warm_boot()
expected_input_select = numpy.array([[False] * 488 ] + [[True] * 488] + [[False] * 488] * 46 + [[False] * 488] * 48) # first 48 rows are True
numpy.testing.assert_equal(expected_input_select, self.proxy.input_select_RW)
expected_antenna_select = numpy.array([[False] * 488 ] + [[True] * 488] + [[False] * 488] * 46)
numpy.testing.assert_equal(expected_antenna_select, self.proxy.antenna_select_RW)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment