Skip to content
Snippets Groups Projects
Commit 6870bb0a authored by Hannes Feldt's avatar Hannes Feldt
Browse files

Merge branch 'L2SS-1007-reshape-rcu-antenna-fields' into 'master'

#L2SS-1007 Re-arrange RCU antenna metrics to [32,3]

Closes L2SS-1007

See merge request !459
parents 15046c6b 58218d37
No related branches found
No related tags found
1 merge request!459#L2SS-1007 Re-arrange RCU antenna metrics to [32,3]
......@@ -25,6 +25,7 @@ from tangostationcontrol.beam.geo import ETRS_to_ITRF, ITRF_to_GEO, GEO_to_GEOHA
from tangostationcontrol.beam.hba_tile import HBATAntennaOffsets, NUMBER_OF_ELEMENTS_PER_TILE
import logging
logger = logging.getLogger()
__all__ = ["AntennaField", "AntennaToRecvMapper", "main"]
......@@ -32,10 +33,12 @@ __all__ = ["AntennaField", "AntennaToRecvMapper", "main"]
# Highest number of HBA tiles we support per AntennaField
MAX_NUMBER_OF_HBAT = 96
class AntennaUse(IntEnum):
AUTO = 0 # use antenna only if its OK or SUSPICIOUS
ON = 1 # force antenna to be on, regardless of quality
OFF = 2 # force antenna to be off, regardless of quality
AUTO = 0 # use antenna only if its OK or SUSPICIOUS
ON = 1 # force antenna to be on, regardless of quality
OFF = 2 # force antenna to be off, regardless of quality
class AntennaQuality(IntEnum):
OK = 0
......@@ -43,6 +46,7 @@ class AntennaQuality(IntEnum):
BROKEN = 2
BEYOND_REPAIR = 3
class mapped_attribute(attribute):
def __init__(self, mapping_attribute, dtype, max_dim_x, max_dim_y=0, access=AttrWriteType.READ, **kwargs):
......@@ -62,12 +66,12 @@ class mapped_attribute(attribute):
self.fget = read_func_wrapper
super().__init__(dtype=dtype, max_dim_y=max_dim_y, max_dim_x=max_dim_x, access=access, fisallowed="is_attribute_access_allowed", **kwargs)
super().__init__(dtype=dtype, max_dim_y=max_dim_y, max_dim_x=max_dim_x, access=access,
fisallowed="is_attribute_access_allowed", **kwargs)
@device_logging_to_python()
class AntennaField(lofar_device):
""" Manages the antennas in a single antenna field, by acting as a
a mapping onto one or more RECV devices.
......@@ -96,7 +100,7 @@ class AntennaField(lofar_device):
doc="Name of each antenna",
dtype='DevVarStringArray',
mandatory=False,
default_value = [f'Antenna{n+1}' for n in range(MAX_NUMBER_OF_HBAT)]
default_value=[f'Antenna{n + 1}' for n in range(MAX_NUMBER_OF_HBAT)]
)
# ----- Antenna states
......@@ -105,14 +109,14 @@ class AntennaField(lofar_device):
doc="Operational quality state of each antenna",
dtype='DevVarUShortArray',
mandatory=False,
default_value = numpy.array([AntennaQuality.OK] * MAX_NUMBER_OF_HBAT)
default_value=numpy.array([AntennaQuality.OK] * MAX_NUMBER_OF_HBAT)
)
Antenna_Use = device_property(
doc="Operational State of each antenna",
dtype='DevVarUShortArray',
mandatory=False,
default_value = numpy.array([AntennaUse.AUTO] * MAX_NUMBER_OF_HBAT)
default_value=numpy.array([AntennaUse.AUTO] * MAX_NUMBER_OF_HBAT)
)
# ----- Antenna properties
......@@ -121,7 +125,7 @@ class AntennaField(lofar_device):
doc="Whether to provide power to each antenna (False for noise sources)",
dtype='DevVarBooleanArray',
mandatory=False,
default_value = numpy.array([False] * MAX_NUMBER_OF_HBAT)
default_value=numpy.array([False] * MAX_NUMBER_OF_HBAT)
)
# ----- Position information
......@@ -154,37 +158,37 @@ class AntennaField(lofar_device):
doc="Reference frame in which the ITRF coordinates are provided, or are to be computed from ETRS89",
dtype='DevString',
mandatory=False,
default_value = "ITRF2005"
default_value="ITRF2005"
)
ITRF_Reference_Epoch = device_property(
doc="Reference epoch in which the ITRF coordinates are provided, or are to be extrapolated from ETRS89",
dtype='DevFloat',
mandatory=False,
default_value = 2015.5
default_value=2015.5
)
HBAT_PQR_rotation_angles_deg = device_property(
doc='Rotation of each tile in the PQ plane ("horizontal") in degrees.',
dtype='DevVarFloatArray',
mandatory=False,
default_value = [0.0] * MAX_NUMBER_OF_HBAT
default_value=[0.0] * MAX_NUMBER_OF_HBAT
)
PQR_to_ETRS_rotation_matrix = device_property(
doc="Field-specific rotation matrix to convert PQR offsets to ETRS/ITRF offsets.",
dtype='DevVarFloatArray',
mandatory=False,
default_value = numpy.array([ # PQR->ETRS rotation matrix for the core stations
[-0.1195951054, -0.7919544517, 0.5987530018],
[ 0.9928227484, -0.0954186800, 0.0720990002],
[ 0.0000330969, 0.6030782884, 0.7976820024]]).flatten()
default_value=numpy.array([ # PQR->ETRS rotation matrix for the core stations
[-0.1195951054, -0.7919544517, 0.5987530018],
[0.9928227484, -0.0954186800, 0.0720990002],
[0.0000330969, 0.6030782884, 0.7976820024]]).flatten()
)
HBAT_base_antenna_offsets = device_property(
doc="Offsets of the antennas in a HBAT, with respect to its reference center (16x3).",
dtype='DevVarFloatArray',
mandatory=False,
default_value = HBATAntennaOffsets.HBAT1_BASE_ANTENNA_OFFSETS.flatten()
default_value=HBATAntennaOffsets.HBAT1_BASE_ANTENNA_OFFSETS.flatten()
)
# ----- SDP mapping
......@@ -192,14 +196,14 @@ class AntennaField(lofar_device):
Antenna_to_SDP_Mapping = device_property(
dtype=(numpy.int32,),
doc='The mapping of Antennas to FPGA input pairs. Each FPGA can handle 6 inputs, and SDP has 16 FPGAs. Each antenna is represented with a (fpga, input) value pair. The array is flattened, so must be reshaped upon use. An input=-1 means the antenna is unconnected.',
default_value = numpy.array([-1] * MAX_NUMBER_OF_HBAT * 2, dtype=numpy.int32)
default_value=numpy.array([-1] * MAX_NUMBER_OF_HBAT * 2, dtype=numpy.int32)
)
SDP_device = device_property(
dtype=str,
doc='Which SDP device is processing this AntennaField.',
mandatory=False,
default_value = "STAT/SDP/1"
default_value="STAT/SDP/1"
)
# ----- RECV mapping
......@@ -208,82 +212,97 @@ class AntennaField(lofar_device):
dtype=(numpy.int32,),
doc='The mapping of Antenna power lines to RECV mapping. Each RECV can handle 96 inputs. The Antenna number is the index and the value shows to which receiver device it is connected and on which input. The first integer is the input. The second integer is the RECV id. Example: [0, 3] = first receiver of property RECV_devices with input 3. -1 means that the Antenna is not connected. The property is stored in a one dimensional structure. It needs to be reshaped to a list of lists of two items.',
mandatory=False,
default_value = [-1] * MAX_NUMBER_OF_HBAT * 2
default_value=[-1] * MAX_NUMBER_OF_HBAT * 2
)
Control_to_RECV_mapping = device_property(
dtype=(numpy.int32,),
doc='The mapping of Antenna control lines to RECV mapping. Each RECV can handle 96 inputs. The Antenna number is the index and the value shows to which receiver device it is connected and on which input. The first integer is the input. The second interger is the RECV id. Example: [1, 3] = STAT/RECV/1 with input 3. -1 means that the Antenna is not connected. The property is stored in a one dimensional structure. It needs to be reshaped to a list of lists of two items.',
mandatory=False,
default_value = [-1] * MAX_NUMBER_OF_HBAT * 2
default_value=[-1] * MAX_NUMBER_OF_HBAT * 2
)
RECV_devices = device_property(
dtype=(str,),
doc='Which Recv devices are in use by the AntennaField. The order is important and it should match up with the order of the mapping.',
mandatory=False,
default_value = []
default_value=[]
)
Antenna_Names_R = attribute(access=AttrWriteType.READ,
dtype=(str,), max_dim_x=MAX_NUMBER_OF_HBAT)
dtype=(str,), max_dim_x=MAX_NUMBER_OF_HBAT)
Antenna_Quality_R = attribute(doc='The quality of each antenna. 0=OK, 1=SUSPICIOUS, 2=BROKEN, 3=BEYOND_REPAIR.',
dtype=(numpy.uint32,), max_dim_x=MAX_NUMBER_OF_HBAT)
Antenna_Use_R = attribute(doc='Whether each antenna should be used. 0=AUTO, 1=ON, 2=OFF. In AUTO mode, the antenna is used if it is not BROKEN or BEYOND_REPAIR.',
dtype=(numpy.uint32,), max_dim_x=MAX_NUMBER_OF_HBAT)
Antenna_Use_R = attribute(
doc='Whether each antenna should be used. 0=AUTO, 1=ON, 2=OFF. In AUTO mode, the antenna is used if it is not BROKEN or BEYOND_REPAIR.',
dtype=(numpy.uint32,), max_dim_x=MAX_NUMBER_OF_HBAT)
Antenna_Quality_str_R = attribute(doc='The quality of each antenna, as a string.',
dtype=(str,), max_dim_x=MAX_NUMBER_OF_HBAT)
dtype=(str,), max_dim_x=MAX_NUMBER_OF_HBAT)
Antenna_Use_str_R = attribute(doc='Whether each antenna should be used, as a string.',
dtype=(str,), max_dim_x=MAX_NUMBER_OF_HBAT)
dtype=(str,), max_dim_x=MAX_NUMBER_OF_HBAT)
Antenna_Usage_Mask_R = attribute(doc='Whether each antenna will be used.',
dtype=(bool,), max_dim_x=MAX_NUMBER_OF_HBAT)
Antenna_to_SDP_Mapping_R = attribute(doc='To which (fpga, input) pair each antenna is connected. -1=unconnected.',
dtype=((numpy.int32,),), max_dim_x=2, max_dim_y=MAX_NUMBER_OF_HBAT)
ANT_mask_RW = mapped_attribute("ANT_mask_RW", dtype=(bool,), max_dim_x=MAX_NUMBER_OF_HBAT, access=AttrWriteType.READ_WRITE)
RCU_PWR_ANT_on_R = mapped_attribute("RCU_PWR_ANT_on_R", dtype=(bool,), max_dim_x=MAX_NUMBER_OF_HBAT)
RCU_PWR_ANT_on_RW = mapped_attribute("RCU_PWR_ANT_on_RW", dtype=(bool,), max_dim_x=MAX_NUMBER_OF_HBAT, access=AttrWriteType.READ_WRITE)
HBAT_BF_delay_steps_R = mapped_attribute("HBAT_BF_delay_steps_R", dtype=((numpy.int64,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2, max_dim_y=MAX_NUMBER_OF_HBAT)
HBAT_BF_delay_steps_RW = mapped_attribute("HBAT_BF_delay_steps_RW", dtype=((numpy.int64,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2, max_dim_y=MAX_NUMBER_OF_HBAT, access=AttrWriteType.READ_WRITE)
HBAT_LED_on_R = mapped_attribute("HBAT_LED_on_R", dtype=((bool,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2, max_dim_y=MAX_NUMBER_OF_HBAT)
HBAT_LED_on_RW = mapped_attribute("HBAT_LED_on_RW", dtype=((bool,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2, max_dim_y=MAX_NUMBER_OF_HBAT, access=AttrWriteType.READ_WRITE)
HBAT_PWR_LNA_on_R = mapped_attribute("HBAT_PWR_LNA_on_R", dtype=((bool,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2, max_dim_y=MAX_NUMBER_OF_HBAT)
HBAT_PWR_LNA_on_RW = mapped_attribute("HBAT_PWR_LNA_on_RW", dtype=((bool,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2, max_dim_y=MAX_NUMBER_OF_HBAT, access=AttrWriteType.READ_WRITE)
HBAT_PWR_on_R = mapped_attribute("HBAT_PWR_on_R", dtype=((bool,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2, max_dim_y=MAX_NUMBER_OF_HBAT)
HBAT_PWR_on_RW = mapped_attribute("HBAT_PWR_on_RW", dtype=((bool,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2, max_dim_y=MAX_NUMBER_OF_HBAT, access=AttrWriteType.READ_WRITE)
RCU_band_select_RW = mapped_attribute("RCU_band_select_RW", dtype=(numpy.int64,), max_dim_x=MAX_NUMBER_OF_HBAT, access=AttrWriteType.READ_WRITE)
dtype=(bool,), max_dim_x=MAX_NUMBER_OF_HBAT)
Antenna_to_SDP_Mapping_R = attribute(doc='To which (fpga, input) pair each antenna is connected. -1=unconnected.',
dtype=((numpy.int32,),), max_dim_x=2, max_dim_y=MAX_NUMBER_OF_HBAT)
ANT_mask_RW = mapped_attribute("ANT_mask_RW", dtype=(bool,), max_dim_x=MAX_NUMBER_OF_HBAT,
access=AttrWriteType.READ_WRITE)
RCU_PWR_ANT_on_R = mapped_attribute("RCU_PWR_ANT_on_R", dtype=(bool,), max_dim_x=MAX_NUMBER_OF_HBAT)
RCU_PWR_ANT_on_RW = mapped_attribute("RCU_PWR_ANT_on_RW", dtype=(bool,), max_dim_x=MAX_NUMBER_OF_HBAT,
access=AttrWriteType.READ_WRITE)
HBAT_BF_delay_steps_R = mapped_attribute("HBAT_BF_delay_steps_R", dtype=((numpy.int64,),),
max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2, max_dim_y=MAX_NUMBER_OF_HBAT)
HBAT_BF_delay_steps_RW = mapped_attribute("HBAT_BF_delay_steps_RW", dtype=((numpy.int64,),),
max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2, max_dim_y=MAX_NUMBER_OF_HBAT,
access=AttrWriteType.READ_WRITE)
HBAT_LED_on_R = mapped_attribute("HBAT_LED_on_R", dtype=((bool,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2,
max_dim_y=MAX_NUMBER_OF_HBAT)
HBAT_LED_on_RW = mapped_attribute("HBAT_LED_on_RW", dtype=((bool,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2,
max_dim_y=MAX_NUMBER_OF_HBAT, access=AttrWriteType.READ_WRITE)
HBAT_PWR_LNA_on_R = mapped_attribute("HBAT_PWR_LNA_on_R", dtype=((bool,),),
max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2, max_dim_y=MAX_NUMBER_OF_HBAT)
HBAT_PWR_LNA_on_RW = mapped_attribute("HBAT_PWR_LNA_on_RW", dtype=((bool,),),
max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2, max_dim_y=MAX_NUMBER_OF_HBAT,
access=AttrWriteType.READ_WRITE)
HBAT_PWR_on_R = mapped_attribute("HBAT_PWR_on_R", dtype=((bool,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2,
max_dim_y=MAX_NUMBER_OF_HBAT)
HBAT_PWR_on_RW = mapped_attribute("HBAT_PWR_on_RW", dtype=((bool,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 2,
max_dim_y=MAX_NUMBER_OF_HBAT, access=AttrWriteType.READ_WRITE)
RCU_band_select_RW = mapped_attribute("RCU_band_select_RW", dtype=(numpy.int64,), max_dim_x=MAX_NUMBER_OF_HBAT,
access=AttrWriteType.READ_WRITE)
# ----- Position information
Antenna_Field_Reference_ITRF_R = attribute(access=AttrWriteType.READ,
doc='Absolute reference position of antenna field, in ITRF (XYZ)',
dtype=(numpy.float64,), max_dim_x=3)
doc='Absolute reference position of antenna field, in ITRF (XYZ)',
dtype=(numpy.float64,), max_dim_x=3)
Antenna_Field_Reference_GEO_R = attribute(access=AttrWriteType.READ,
doc='Absolute reference position of antenna field, in latitude/longitude (degrees)',
dtype=(numpy.float64,), max_dim_x=2)
doc='Absolute reference position of antenna field, in latitude/longitude (degrees)',
dtype=(numpy.float64,), max_dim_x=2)
Antenna_Field_Reference_GEOHASH_R = attribute(access=AttrWriteType.READ,
doc='Absolute reference position of antenna field, as a geohash string',
dtype=str)
doc='Absolute reference position of antenna field, as a geohash string',
dtype=str)
HBAT_antenna_ITRF_offsets_R = attribute(access=AttrWriteType.READ,
doc='For each tile, the offsets of the antennas within that, in ITRF ("iHBADeltas"). True shape: nrtiles x 16 x 3.',
dtype=((numpy.float64,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 3, max_dim_y=96)
doc='For each tile, the offsets of the antennas within that, in ITRF ("iHBADeltas"). True shape: nrtiles x 16 x 3.',
dtype=((numpy.float64,),), max_dim_x=NUMBER_OF_ELEMENTS_PER_TILE * 3,
max_dim_y=96)
Antenna_Reference_ITRF_R = attribute(access=AttrWriteType.READ,
doc='Absolute reference position of each tile, in ITRF (XYZ)',
dtype=((numpy.float64,),), max_dim_x=3, max_dim_y=MAX_NUMBER_OF_HBAT)
doc='Absolute reference position of each tile, in ITRF (XYZ)',
dtype=((numpy.float64,),), max_dim_x=3, max_dim_y=MAX_NUMBER_OF_HBAT)
Antenna_Reference_GEO_R = attribute(access=AttrWriteType.READ,
doc='Absolute reference position of each tile, in latitude/longitude (degrees)',
dtype=((numpy.float64,),), max_dim_x=2, max_dim_y=MAX_NUMBER_OF_HBAT)
doc='Absolute reference position of each tile, in latitude/longitude (degrees)',
dtype=((numpy.float64,),), max_dim_x=2, max_dim_y=MAX_NUMBER_OF_HBAT)
Antenna_Reference_GEOHASH_R = attribute(access=AttrWriteType.READ,
doc='Absolute reference position of each tile, as geohash strings',
dtype=(str,), max_dim_x=MAX_NUMBER_OF_HBAT,)
doc='Absolute reference position of each tile, as geohash strings',
dtype=(str,), max_dim_x=MAX_NUMBER_OF_HBAT, )
nr_antennas_R = attribute(
doc='Number of Antennas in this field',
......@@ -305,16 +324,16 @@ class AntennaField(lofar_device):
return [AntennaQuality(x).name for x in self.Antenna_Quality]
def read_Antenna_Usage_Mask_R(self):
use = numpy.array(self.Antenna_Use)
use = numpy.array(self.Antenna_Use)
quality = numpy.array(self.Antenna_Quality)
antennas_forced_on = use == AntennaUse.ON
antennas_auto_on = numpy.logical_and(use == AntennaUse.AUTO, quality <= AntennaQuality.SUSPICIOUS)
antennas_auto_on = numpy.logical_and(use == AntennaUse.AUTO, quality <= AntennaQuality.SUSPICIOUS)
return numpy.logical_or(antennas_forced_on, antennas_auto_on)
def read_Antenna_to_SDP_Mapping_R(self):
return numpy.array(self.Antenna_to_SDP_Mapping).reshape(-1,2)
return numpy.array(self.Antenna_to_SDP_Mapping).reshape(-1, 2)
def read_nr_antennas_R(self):
# The number of antennas should be equal to:
......@@ -359,27 +378,27 @@ class AntennaField(lofar_device):
tiles lie on the same plane in ITRF. """
# the relative offsets between the elements is fixed in HBAT_base_antenna_offsets
base_antenna_offsets = numpy.array(self.HBAT_base_antenna_offsets).reshape(NUMBER_OF_ELEMENTS_PER_TILE,3)
base_antenna_offsets = numpy.array(self.HBAT_base_antenna_offsets).reshape(NUMBER_OF_ELEMENTS_PER_TILE, 3)
PQR_to_ETRS_rotation_matrix = numpy.array(self.PQR_to_ETRS_rotation_matrix).reshape(3,3)
PQR_to_ETRS_rotation_matrix = numpy.array(self.PQR_to_ETRS_rotation_matrix).reshape(3, 3)
# each tile has its own rotation angle, resulting in different offsets per tile
all_offsets = numpy.array(
[HBATAntennaOffsets.ITRF_offsets(
base_antenna_offsets,
angle_deg * pi / 180,
PQR_to_ETRS_rotation_matrix)
for angle_deg in self.HBAT_PQR_rotation_angles_deg])
[HBATAntennaOffsets.ITRF_offsets(
base_antenna_offsets,
angle_deg * pi / 180,
PQR_to_ETRS_rotation_matrix)
for angle_deg in self.HBAT_PQR_rotation_angles_deg])
return all_offsets.reshape(-1, NUMBER_OF_ELEMENTS_PER_TILE * 3)
def read_Antenna_Reference_ITRF_R(self):
# provide ITRF coordinates if they were configured
if self.Antenna_Reference_ITRF:
return numpy.array(self.Antenna_Reference_ITRF).reshape(-1,3)
return numpy.array(self.Antenna_Reference_ITRF).reshape(-1, 3)
# calculate them from ETRS coordinates if not, using the configured ITRF reference
ETRS_coordinates = numpy.array(self.Antenna_Reference_ETRS).reshape(-1,3)
ETRS_coordinates = numpy.array(self.Antenna_Reference_ETRS).reshape(-1, 3)
return ETRS_to_ITRF(ETRS_coordinates, self.ITRF_Reference_Frame, self.ITRF_Reference_Epoch)
def read_Antenna_Reference_GEO_R(self):
......@@ -462,7 +481,7 @@ class AntennaField(lofar_device):
# Commands
# --------
@command(dtype_in=DevVarFloatArray, dtype_out=DevVarLongArray)
def calculate_HBAT_bf_delay_steps(self, delays: numpy.ndarray):
def calculate_HBAT_bf_delay_steps(self, delays: numpy.ndarray):
num_tiles = self.read_nr_antennas_R()
delays = delays.reshape(num_tiles, NUMBER_OF_ELEMENTS_PER_TILE)
......@@ -472,7 +491,7 @@ class AntennaField(lofar_device):
for recv_idx, recv_proxy in enumerate(self.recv_proxies):
# collect all delays for this recv_proxy
recv_result_indices = numpy.where(control_mapping[:,0] == (recv_idx + 1))
recv_result_indices = numpy.where(control_mapping[:, 0] == (recv_idx + 1))
recv_delays = delays[recv_result_indices]
if not recv_result_indices:
......@@ -480,8 +499,9 @@ class AntennaField(lofar_device):
continue
# convert them into delay steps
flatten_delay_steps = numpy.array(recv_proxy.calculate_HBAT_bf_delay_steps(recv_delays.flatten()), dtype=numpy.int64)
delay_steps = numpy.reshape(flatten_delay_steps,(-1, NUMBER_OF_ELEMENTS_PER_TILE * 2))
flatten_delay_steps = numpy.array(recv_proxy.calculate_HBAT_bf_delay_steps(recv_delays.flatten()),
dtype=numpy.int64)
delay_steps = numpy.reshape(flatten_delay_steps, (-1, NUMBER_OF_ELEMENTS_PER_TILE * 2))
# write back into same positions we collected them from
result_values[recv_result_indices] = delay_steps
......@@ -490,7 +510,6 @@ class AntennaField(lofar_device):
class AntennaToRecvMapper(object):
_VALUE_MAP_NONE_96 = numpy.full(96, None)
_VALUE_MAP_NONE_96_32 = numpy.full((96, 32), None)
......@@ -507,33 +526,39 @@ class AntennaToRecvMapper(object):
self._power_mapping = power_to_recv_mapping
self._number_of_receivers = number_of_receivers
self._default_value_mapping_read = {
"ANT_mask_RW": value_map_ant_bool,
"RCU_PWR_ANT_on_R": value_map_ant_bool,
"RCU_PWR_ANT_on_RW": value_map_ant_bool,
"HBAT_BF_delay_steps_R": value_map_ant_32_int,
"HBAT_BF_delay_steps_RW": value_map_ant_32_int,
"HBAT_LED_on_R": value_map_ant_32_bool,
"HBAT_LED_on_RW": value_map_ant_32_bool,
"HBAT_PWR_LNA_on_R": value_map_ant_32_bool,
"HBAT_PWR_LNA_on_RW": value_map_ant_32_bool,
"HBAT_PWR_on_R": value_map_ant_32_bool,
"HBAT_PWR_on_RW": value_map_ant_32_bool,
"RCU_band_select_RW": numpy.zeros(number_of_antennas, dtype=numpy.int64)
"ANT_mask_RW": value_map_ant_bool,
"HBAT_BF_delay_steps_R": value_map_ant_32_int,
"HBAT_BF_delay_steps_RW": value_map_ant_32_int,
"HBAT_LED_on_R": value_map_ant_32_bool,
"HBAT_LED_on_RW": value_map_ant_32_bool,
"HBAT_PWR_LNA_on_R": value_map_ant_32_bool,
"HBAT_PWR_LNA_on_RW": value_map_ant_32_bool,
"HBAT_PWR_on_R": value_map_ant_32_bool,
"HBAT_PWR_on_RW": value_map_ant_32_bool,
"RCU_PWR_ANT_on_R": value_map_ant_bool,
"RCU_PWR_ANT_on_RW": value_map_ant_bool,
"RCU_band_select_RW": numpy.zeros(number_of_antennas, dtype=numpy.int64)
}
self._masked_value_mapping_write = {
"ANT_mask_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96,
"RCU_PWR_ANT_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96,
"HBAT_BF_delay_steps_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96_32,
"HBAT_LED_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96_32,
"HBAT_PWR_LNA_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96_32,
"HBAT_PWR_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96_32,
"RCU_band_select_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96,
"ANT_mask_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96,
"HBAT_BF_delay_steps_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96_32,
"HBAT_LED_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96_32,
"HBAT_PWR_LNA_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96_32,
"HBAT_PWR_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96_32,
"RCU_PWR_ANT_on_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96,
"RCU_band_select_RW": AntennaToRecvMapper._VALUE_MAP_NONE_96,
}
self._reshape_attributes_in = {
"HBAT_BF_delay_steps_RW": (96, 32),
"RCU_PWR_ANT_on_R": (96,),
"RCU_PWR_ANT_on_RW": (96,),
"RCU_band_select_RW": (96,),
}
self._reshape_attributes_out = {
"HBAT_BF_delay_steps_RW": (96, 32),
"RCU_PWR_ANT_on_R": (32, 3),
"RCU_PWR_ANT_on_RW": (32, 3),
"RCU_band_select_RW": (32, 3),
}
def map_read(self, mapped_attribute: str, recv_results: List[any]) -> List[any]:
......@@ -550,15 +575,14 @@ class AntennaToRecvMapper(object):
if mapped_attribute in self._reshape_attributes_in:
recv_results = numpy.reshape(recv_results,
(self._number_of_receivers,) + self._reshape_attributes_in[mapped_attribute])
(self._number_of_receivers,) + self._reshape_attributes_in[mapped_attribute])
return self._mapped_r_values(recv_results, default_values)
def map_write(self, mapped_attribute: str, set_values: List[any]) -> List[any]:
"""Perform a mapped write for the attribute using the set_values
:param mapped_attribute: attribute identifier as present in
py:attribute:`~_default_value_mapping_write`
:param mapped_attribute: attribute identifier as present in py:attribute:`~_default_value_mapping_write`
:param set_values: The values to be set for the specified attribute
:return: set_values as mapped given attribute dimensions and control mapping
"""
......@@ -603,6 +627,7 @@ class AntennaToRecvMapper(object):
return mapped_values
# ----------
# Run server
# ----------
......
......@@ -28,10 +28,12 @@ from tangostationcontrol.devices.device_decorators import only_in_states
from tangostationcontrol.devices.opcua_device import opcua_device
import logging
logger = logging.getLogger()
__all__ = ["RECV", "main"]
@device_logging_to_python()
class RECV(opcua_device):
......@@ -78,7 +80,8 @@ class RECV(opcua_device):
RCU_PWR_ANT_on_RW_default = device_property(
dtype='DevVarBooleanArray',
mandatory=False,
default_value=[False] * 96 # turn power off by default in test setups, f.e. to prevent blowing up the noise sources
default_value=[False] * 96
# turn power off by default in test setups, f.e. to prevent blowing up the noise sources
)
RECVTR_monitor_rate_RW_default = device_property(
......@@ -122,62 +125,79 @@ class RECV(opcua_device):
10.0463E-9, 10.5774E-9, 11.0509E-9, 11.5289E-9, 11.9374E-9,
12.4524E-9, 13.0842E-9, 13.5936E-9, 13.9198E-9, 14.4087E-9,
14.9781E-9, 15.5063E-9
],dtype=numpy.float64)
], dtype=numpy.float64)
)
HBAT_signal_input_delays = device_property(
doc='Signal input delay calibration values for the elements within a tile.',
dtype='DevVarFloatArray',
mandatory=False,
default_value = numpy.zeros((32,), dtype=numpy.float64)
default_value=numpy.zeros((32,), dtype=numpy.float64)
)
# ----------
# Attributes
# ----------
ANT_mask_RW = attribute_wrapper(comms_annotation=["ANT_mask_RW" ],datatype=bool , dims=(96,), access=AttrWriteType.READ_WRITE)
ANT_mask_RW = attribute_wrapper(comms_annotation=["ANT_mask_RW"], datatype=bool, dims=(96,),
access=AttrWriteType.READ_WRITE)
# The HBAT beamformer delays represent 32 delays for each of the 96 inputs.
# The 32 delays deconstruct as delays[polarisation][dipole], and each delay is the number of 'delay steps' to apply (0.5ns for HBAT1).
HBAT_BF_delay_steps_R = attribute_wrapper(comms_annotation=["HBAT_BF_delay_steps_R" ],datatype=numpy.int64 , dims=(96,16,2))
HBAT_BF_delay_steps_RW = attribute_wrapper(comms_annotation=["HBAT_BF_delay_steps_RW" ],datatype=numpy.int64 , dims=(96,16,2), access=AttrWriteType.READ_WRITE)
HBAT_LED_on_R = attribute_wrapper(comms_annotation=["HBAT_LED_on_R" ],datatype=bool , dims=(96,16,2))
HBAT_LED_on_RW = attribute_wrapper(comms_annotation=["HBAT_LED_on_RW" ],datatype=bool , dims=(96,16,2), access=AttrWriteType.READ_WRITE)
HBAT_PWR_LNA_on_R = attribute_wrapper(comms_annotation=["HBAT_PWR_LNA_on_R" ],datatype=bool , dims=(96,16,2))
HBAT_PWR_LNA_on_RW = attribute_wrapper(comms_annotation=["HBAT_PWR_LNA_on_RW" ],datatype=bool , dims=(96,16,2), access=AttrWriteType.READ_WRITE)
HBAT_PWR_on_R = attribute_wrapper(comms_annotation=["HBAT_PWR_on_R" ],datatype=bool , dims=(96,16,2))
HBAT_PWR_on_RW = attribute_wrapper(comms_annotation=["HBAT_PWR_on_RW" ],datatype=bool , dims=(96,16,2), access=AttrWriteType.READ_WRITE)
RCU_ADC_locked_R = attribute_wrapper(comms_annotation=["RCU_ADC_locked_R" ],datatype=bool , dims=(96,))
RCU_attenuator_dB_R = attribute_wrapper(comms_annotation=["RCU_attenuator_dB_R" ],datatype=numpy.int64 , dims=(96,))
RCU_attenuator_dB_RW = attribute_wrapper(comms_annotation=["RCU_attenuator_dB_RW" ],datatype=numpy.int64 , dims=(96,), access=AttrWriteType.READ_WRITE)
RCU_band_select_R = attribute_wrapper(comms_annotation=["RCU_band_select_R" ],datatype=numpy.int64 , dims=(96,))
RCU_band_select_RW = attribute_wrapper(comms_annotation=["RCU_band_select_RW" ],datatype=numpy.int64 , dims=(96,), access=AttrWriteType.READ_WRITE)
RCU_DTH_freq_R = attribute_wrapper(comms_annotation=["RCU_DTH_freq_R" ],datatype=numpy.int64 , dims=(96,))
RCU_DTH_freq_RW = attribute_wrapper(comms_annotation=["RCU_DTH_freq_RW" ],datatype=numpy.int64 , dims=(96,), access=AttrWriteType.READ_WRITE)
RCU_DTH_on_R = attribute_wrapper(comms_annotation=["RCU_DTH_on_R" ],datatype=bool , dims=(96,))
RCU_LED_green_on_R = attribute_wrapper(comms_annotation=["RCU_LED_green_on_R" ],datatype=bool , dims=(32,))
RCU_LED_green_on_RW = attribute_wrapper(comms_annotation=["RCU_LED_green_on_RW" ],datatype=bool , dims=(32,), access=AttrWriteType.READ_WRITE)
RCU_LED_red_on_R = attribute_wrapper(comms_annotation=["RCU_LED_red_on_R" ],datatype=bool , dims=(32,))
RCU_LED_red_on_RW = attribute_wrapper(comms_annotation=["RCU_LED_red_on_RW" ],datatype=bool , dims=(32,), access=AttrWriteType.READ_WRITE)
RCU_mask_RW = attribute_wrapper(comms_annotation=["RCU_mask_RW" ],datatype=bool , dims=(32,), access=AttrWriteType.READ_WRITE)
RCU_PCB_ID_R = attribute_wrapper(comms_annotation=["RCU_PCB_ID_R" ],datatype=numpy.int64 , dims=(32,))
RCU_PCB_number_R = attribute_wrapper(comms_annotation=["RCU_PCB_number_R" ],datatype=str , dims=(32,))
RCU_PCB_version_R = attribute_wrapper(comms_annotation=["RCU_PCB_version_R" ],datatype=str , dims=(32,))
RCU_PWR_1V8_R = attribute_wrapper(comms_annotation=["RCU_PWR_1V8_R" ],datatype=numpy.float64, dims=(32,))
RCU_PWR_2V5_R = attribute_wrapper(comms_annotation=["RCU_PWR_2V5_R" ],datatype=numpy.float64, dims=(32,))
RCU_PWR_3V3_R = attribute_wrapper(comms_annotation=["RCU_PWR_3V3_R" ],datatype=numpy.float64, dims=(32,))
RCU_PWR_ANALOG_on_R = attribute_wrapper(comms_annotation=["RCU_PWR_ANALOG_on_R" ],datatype=bool , dims=(32,))
RCU_PWR_ANT_IOUT_R = attribute_wrapper(comms_annotation=["RCU_PWR_ANT_IOUT_R" ],datatype=numpy.float64, dims=(96,))
RCU_PWR_ANT_on_R = attribute_wrapper(comms_annotation=["RCU_PWR_ANT_on_R" ],datatype=bool , dims=(96,))
RCU_PWR_ANT_on_RW = attribute_wrapper(comms_annotation=["RCU_PWR_ANT_on_RW" ],datatype=bool , dims=(96,), access=AttrWriteType.READ_WRITE)
RCU_PWR_ANT_VIN_R = attribute_wrapper(comms_annotation=["RCU_PWR_ANT_VIN_R" ],datatype=numpy.float64, dims=(96,))
RCU_PWR_ANT_VOUT_R = attribute_wrapper(comms_annotation=["RCU_PWR_ANT_VOUT_R" ],datatype=numpy.float64, dims=(96,))
RCU_PWR_DIGITAL_on_R = attribute_wrapper(comms_annotation=["RCU_PWR_DIGITAL_on_R" ],datatype=bool , dims=(32,))
RCU_PWR_good_R = attribute_wrapper(comms_annotation=["RCU_PWR_good_R" ],datatype=bool , dims=(32,))
RCU_TEMP_R = attribute_wrapper(comms_annotation=["RCU_TEMP_R" ],datatype=numpy.float64, dims=(32,))
RECVTR_I2C_error_R = attribute_wrapper(comms_annotation=["RECVTR_I2C_error_R" ],datatype=numpy.int64 , dims=(32,))
RECVTR_monitor_rate_RW = attribute_wrapper(comms_annotation=["RECVTR_monitor_rate_RW" ],datatype=numpy.int64 , access=AttrWriteType.READ_WRITE)
RECVTR_translator_busy_R = attribute_wrapper(comms_annotation=["RECVTR_translator_busy_R" ],datatype=bool)
HBAT_BF_delay_steps_R = attribute_wrapper(comms_annotation=["HBAT_BF_delay_steps_R"], datatype=numpy.int64,
dims=(96, 2, 16))
HBAT_BF_delay_steps_RW = attribute_wrapper(comms_annotation=["HBAT_BF_delay_steps_RW"], datatype=numpy.int64,
dims=(96, 2, 16), access=AttrWriteType.READ_WRITE)
HBAT_LED_on_R = attribute_wrapper(comms_annotation=["HBAT_LED_on_R"], datatype=bool, dims=(96, 2, 16))
HBAT_LED_on_RW = attribute_wrapper(comms_annotation=["HBAT_LED_on_RW"], datatype=bool, dims=(96, 2, 16),
access=AttrWriteType.READ_WRITE)
HBAT_PWR_LNA_on_R = attribute_wrapper(comms_annotation=["HBAT_PWR_LNA_on_R"], datatype=bool, dims=(96, 2, 16))
HBAT_PWR_LNA_on_RW = attribute_wrapper(comms_annotation=["HBAT_PWR_LNA_on_RW"], datatype=bool, dims=(96, 2, 16),
access=AttrWriteType.READ_WRITE)
HBAT_PWR_on_R = attribute_wrapper(comms_annotation=["HBAT_PWR_on_R"], datatype=bool, dims=(96, 2, 16))
HBAT_PWR_on_RW = attribute_wrapper(comms_annotation=["HBAT_PWR_on_RW"], datatype=bool, dims=(96, 2, 16),
access=AttrWriteType.READ_WRITE)
RCU_ADC_locked_R = attribute_wrapper(comms_annotation=["RCU_ADC_locked_R"], datatype=bool, dims=(32, 3))
RCU_attenuator_dB_R = attribute_wrapper(comms_annotation=["RCU_attenuator_dB_R"], datatype=numpy.int64,
dims=(32, 3))
RCU_attenuator_dB_RW = attribute_wrapper(comms_annotation=["RCU_attenuator_dB_RW"], datatype=numpy.int64,
dims=(32, 3), access=AttrWriteType.READ_WRITE)
RCU_band_select_R = attribute_wrapper(comms_annotation=["RCU_band_select_R"], datatype=numpy.int64, dims=(32, 3))
RCU_band_select_RW = attribute_wrapper(comms_annotation=["RCU_band_select_RW"], datatype=numpy.int64, dims=(32, 3),
access=AttrWriteType.READ_WRITE)
RCU_DTH_freq_R = attribute_wrapper(comms_annotation=["RCU_DTH_freq_R"], datatype=numpy.int64, dims=(32, 3))
RCU_DTH_freq_RW = attribute_wrapper(comms_annotation=["RCU_DTH_freq_RW"], datatype=numpy.int64, dims=(32, 3),
access=AttrWriteType.READ_WRITE)
RCU_DTH_on_R = attribute_wrapper(comms_annotation=["RCU_DTH_on_R"], datatype=bool, dims=(32, 3))
RCU_LED_green_on_R = attribute_wrapper(comms_annotation=["RCU_LED_green_on_R"], datatype=bool, dims=(32,))
RCU_LED_green_on_RW = attribute_wrapper(comms_annotation=["RCU_LED_green_on_RW"], datatype=bool, dims=(32,),
access=AttrWriteType.READ_WRITE)
RCU_LED_red_on_R = attribute_wrapper(comms_annotation=["RCU_LED_red_on_R"], datatype=bool, dims=(32,))
RCU_LED_red_on_RW = attribute_wrapper(comms_annotation=["RCU_LED_red_on_RW"], datatype=bool, dims=(32,),
access=AttrWriteType.READ_WRITE)
RCU_mask_RW = attribute_wrapper(comms_annotation=["RCU_mask_RW"], datatype=bool, dims=(32,),
access=AttrWriteType.READ_WRITE)
RCU_PCB_ID_R = attribute_wrapper(comms_annotation=["RCU_PCB_ID_R"], datatype=numpy.int64, dims=(32,))
RCU_PCB_number_R = attribute_wrapper(comms_annotation=["RCU_PCB_number_R"], datatype=str, dims=(32,))
RCU_PCB_version_R = attribute_wrapper(comms_annotation=["RCU_PCB_version_R"], datatype=str, dims=(32,))
RCU_PWR_1V8_R = attribute_wrapper(comms_annotation=["RCU_PWR_1V8_R"], datatype=numpy.float64, dims=(32,))
RCU_PWR_2V5_R = attribute_wrapper(comms_annotation=["RCU_PWR_2V5_R"], datatype=numpy.float64, dims=(32,))
RCU_PWR_3V3_R = attribute_wrapper(comms_annotation=["RCU_PWR_3V3_R"], datatype=numpy.float64, dims=(32,))
RCU_PWR_ANALOG_on_R = attribute_wrapper(comms_annotation=["RCU_PWR_ANALOG_on_R"], datatype=bool, dims=(32,))
RCU_PWR_ANT_IOUT_R = attribute_wrapper(comms_annotation=["RCU_PWR_ANT_IOUT_R"], datatype=numpy.float64,
dims=(32, 3))
RCU_PWR_ANT_on_R = attribute_wrapper(comms_annotation=["RCU_PWR_ANT_on_R"], datatype=bool, dims=(32, 3))
RCU_PWR_ANT_on_RW = attribute_wrapper(comms_annotation=["RCU_PWR_ANT_on_RW"], datatype=bool, dims=(32, 3),
access=AttrWriteType.READ_WRITE)
RCU_PWR_ANT_VIN_R = attribute_wrapper(comms_annotation=["RCU_PWR_ANT_VIN_R"], datatype=numpy.float64, dims=(32, 3))
RCU_PWR_ANT_VOUT_R = attribute_wrapper(comms_annotation=["RCU_PWR_ANT_VOUT_R"], datatype=numpy.float64,
dims=(32, 3))
RCU_PWR_DIGITAL_on_R = attribute_wrapper(comms_annotation=["RCU_PWR_DIGITAL_on_R"], datatype=bool, dims=(32,))
RCU_PWR_good_R = attribute_wrapper(comms_annotation=["RCU_PWR_good_R"], datatype=bool, dims=(32,))
RCU_TEMP_R = attribute_wrapper(comms_annotation=["RCU_TEMP_R"], datatype=numpy.float64, dims=(32,))
RECVTR_I2C_error_R = attribute_wrapper(comms_annotation=["RECVTR_I2C_error_R"], datatype=numpy.int64, dims=(32,))
RECVTR_monitor_rate_RW = attribute_wrapper(comms_annotation=["RECVTR_monitor_rate_RW"], datatype=numpy.int64,
access=AttrWriteType.READ_WRITE)
RECVTR_translator_busy_R = attribute_wrapper(comms_annotation=["RECVTR_translator_busy_R"], datatype=bool)
# ----------
# Summarising Attributes
......@@ -185,48 +205,50 @@ class RECV(opcua_device):
RCU_LED_colour_R = attribute(dtype=(numpy.uint32,), max_dim_x=32, fisallowed="is_attribute_access_allowed")
def read_RCU_LED_colour_R(self):
return (2 * self.read_attribute("RCU_LED_green_on_R") + 4 * self.read_attribute("RCU_LED_red_on_R")).astype(numpy.uint32)
return (2 * self.read_attribute("RCU_LED_green_on_R") + 4 * self.read_attribute("RCU_LED_red_on_R")).astype(
numpy.uint32)
RCU_error_R = attribute(dtype=(bool,), max_dim_x=32, fisallowed="is_attribute_access_allowed")
ANT_error_R = attribute(dtype=(bool,), max_dim_x=96, fisallowed="is_attribute_access_allowed")
RCU_error_R = attribute(dtype=(bool,), max_dim_x=32, fisallowed="is_attribute_access_allowed")
ANT_error_R = attribute(dtype=(bool,), max_dim_x=96, fisallowed="is_attribute_access_allowed")
def read_RCU_error_R(self):
return self.read_attribute("RCU_mask_RW") & (
(self.read_attribute("RECVTR_I2C_error_R") > 0)
| self.alarm_val("RCU_PCB_ID_R")
)
(self.read_attribute("RECVTR_I2C_error_R") > 0)
| self.alarm_val("RCU_PCB_ID_R")
)
def read_ANT_error_R(self):
return self.read_attribute("ANT_mask_RW") & (
~self.read_attribute("RCU_ADC_locked_R")
)
~self.read_attribute("RCU_ADC_locked_R").flatten()
)
RECV_IOUT_error_R = attribute(dtype=(bool,), max_dim_x=96, fisallowed="is_attribute_access_allowed")
RECV_TEMP_error_R = attribute(dtype=(bool,), max_dim_x=32, fisallowed="is_attribute_access_allowed", polling_period=1000)
RECV_VOUT_error_R = attribute(dtype=(bool,), max_dim_x=32, fisallowed="is_attribute_access_allowed")
RECV_IOUT_error_R = attribute(dtype=(bool,), max_dim_x=96, fisallowed="is_attribute_access_allowed")
RECV_TEMP_error_R = attribute(dtype=(bool,), max_dim_x=32, fisallowed="is_attribute_access_allowed",
polling_period=1000)
RECV_VOUT_error_R = attribute(dtype=(bool,), max_dim_x=32, fisallowed="is_attribute_access_allowed")
def read_RECV_IOUT_error_R(self):
return self.read_attribute("ANT_mask_RW") & (
self.alarm_val("RCU_PWR_ANT_IOUT_R")
)
self.alarm_val("RCU_PWR_ANT_IOUT_R").flatten()
)
def read_RECV_TEMP_error_R(self):
# Don't apply the mask here --- we always want to know if things get too hot!
return (
self.alarm_val("RCU_TEMP_R")
)
self.alarm_val("RCU_TEMP_R")
)
def read_RECV_VOUT_error_R(self):
return (self.read_attribute("ANT_mask_RW") & (
self.alarm_val("RCU_PWR_ANT_VIN_R")
| self.alarm_val("RCU_PWR_ANT_VOUT_R")
)).reshape(32,3).any(axis=1) | (self.read_attribute("RCU_mask_RW") & (
self.alarm_val("RCU_PWR_1V8_R")
| self.alarm_val("RCU_PWR_2V5_R")
| self.alarm_val("RCU_PWR_3V3_R")
| ~self.read_attribute("RCU_PWR_DIGITAL_on_R")
| ~self.read_attribute("RCU_PWR_good_R")
))
self.alarm_val("RCU_PWR_ANT_VIN_R").flatten()
| self.alarm_val("RCU_PWR_ANT_VOUT_R").flatten()
)).reshape(32, 3).any(axis=1) | (self.read_attribute("RCU_mask_RW") & (
self.alarm_val("RCU_PWR_1V8_R")
| self.alarm_val("RCU_PWR_2V5_R")
| self.alarm_val("RCU_PWR_3V3_R")
| ~self.read_attribute("RCU_PWR_DIGITAL_on_R")
| ~self.read_attribute("RCU_PWR_good_R")
))
# --------
# overloaded functions
......@@ -295,8 +317,8 @@ class RECV(opcua_device):
def calculate_HBAT_bf_delay_steps(self, delays: numpy.ndarray):
""" converts a signal path delay (in seconds) to an analog beam weight """
# Reshape the flatten input array, into whatever how many tiles we get
delays = numpy.array(delays).reshape(-1,16)
# Reshape the flattened input array, into whatever how many tiles we get
delays = numpy.array(delays).reshape(-1, 16)
# Calculate the beam weight array
HBAT_bf_delay_steps = self._calculate_HBAT_bf_delay_steps(delays)
......@@ -348,6 +370,7 @@ class RECV(opcua_device):
"""
self.opcua_connection.call_method(["RCU_DTH_on"])
# ----------
# Run server
# ----------
......
......@@ -255,20 +255,20 @@ class TestAntennaFieldDevice(AbstractTestBases.TestDeviceBase):
antennafield_proxy.put_property(mapping_properties)
antennafield_proxy.boot()
self.recv_proxy.write_attribute("RCU_band_select_RW", [False] * 96)
self.recv_proxy.write_attribute("RCU_band_select_RW", [[False] * 3] * 32)
try:
antennafield_proxy.write_attribute(
"RCU_band_select_RW", [True] * 96
)
numpy.testing.assert_equal(
numpy.array([True] * 96),
numpy.array([[True] * 3] * 32),
self.recv_proxy.read_attribute("RCU_band_select_RW").value
)
finally:
# Always restore recv again
self.recv_proxy.write_attribute(
"RCU_band_select_RW", [False] * 96
"RCU_band_select_RW", [[False] * 3] * 32
)
# Verify device did not enter FAULT state
......
......@@ -92,15 +92,15 @@ class TestStatisticsWriterSST(BaseIntegrationTestCase):
# Test attribute values retrieval
collector.parse_device_attributes()
numpy.testing.assert_equal(
collector.parameters["rcu_attenuator_dB"].flatten(),
collector.parameters["rcu_attenuator_dB"],
self.recv_proxy.rcu_attenuator_dB_r
)
numpy.testing.assert_equal(
collector.parameters["rcu_band_select"].flatten(),
collector.parameters["rcu_band_select"],
self.recv_proxy.rcu_band_select_r.tolist()
)
numpy.testing.assert_equal(
collector.parameters["rcu_dth_on"].flatten(),
collector.parameters["rcu_dth_on"],
self.recv_proxy.rcu_dth_on_r.tolist()
)
......
......@@ -224,11 +224,35 @@ class TestAntennaToRecvMapper(base.TestCase):
actual = mapper.map_write("ANT_mask_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_rcu_pwr_ant_on_no_mapping_and_one_receiver(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1)
set_values = [None] * 48
expected = [[[None, None, None]] * 32]
actual = mapper.map_write("RCU_PWR_ANT_on_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_rcu_pwr_ant_on_no_mapping_and_two_receivers(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2)
set_values = [None] * 48
expected = [[[None, None, None]] * 32] * 2
actual = mapper.map_write("RCU_PWR_ANT_on_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_rcu_pwr_ant_on_hba_0_and_1_on_rcu_1_and_0_of_recv_1(self):
mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1)
set_values = [1, 0] + [None] * 46
expected = [[[0, 1, None]] + [[None, None, None]] * 31]
actual = mapper.map_write("RCU_PWR_ANT_on_RW", set_values)
numpy.testing.assert_equal(expected, actual)
def test_map_write_rcu_band_select_no_mapping_and_one_receiver(self):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 1)
set_values = [None] * 48
expected = [[None] * 96]
expected = [[[None, None, None]] * 32]
actual = mapper.map_write("RCU_band_select_RW", set_values)
numpy.testing.assert_equal(expected, actual)
......@@ -236,7 +260,7 @@ class TestAntennaToRecvMapper(base.TestCase):
mapper = AntennaToRecvMapper(self.CONTROL_NOT_CONNECTED, self.POWER_NOT_CONNECTED, 2)
set_values = [None] * 48
expected = [[None] * 96] * 2
expected = [[[None, None, None]] * 32] * 2
actual = mapper.map_write("RCU_band_select_RW", set_values)
numpy.testing.assert_equal(expected, actual)
......@@ -244,7 +268,7 @@ class TestAntennaToRecvMapper(base.TestCase):
mapper = AntennaToRecvMapper(self.CONTROL_HBA_0_AND_1_ON_RCU_1_AND_0_OF_RECV_1, self.POWER_NOT_CONNECTED, 1)
set_values = [1, 0] + [None] * 46
expected = [[0, 1] + [None] * 94]
expected = [[[0, 1, None]] + [[None, None, None]] * 31]
actual = mapper.map_write("RCU_band_select_RW", set_values)
numpy.testing.assert_equal(expected, actual)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment