Select Git revision
antennafield_device.py
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
antennafield_device.py 35.27 KiB
# Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
"""AntennaField Abstract Device Server for LOFAR2.0"""
import logging
import datetime
from enum import IntEnum
from typing import List, Dict
import numpy
# PyTango imports
from tango import (
AttrWriteType,
DevVarBooleanArray,
DebugIt,
)
from tango.server import device_property, attribute, command
# Additional import
from tangostationcontrol.beam.geo import ETRS_to_ITRF
from tangostationcontrol.beam.geo import GEO_to_GEOHASH
from tangostationcontrol.beam.geo import ITRF_to_GEO
from tangostationcontrol.common.antennas import antenna_set_to_mask
from tangostationcontrol.common.cables import cable_types
from tangostationcontrol.common.constants import (
MAX_ANTENNA,
N_pol,
N_xyz,
N_latlong,
N_pn,
A_pn,
N_ANTENNA_SETS,
)
from tangostationcontrol.common.device_decorators import only_in_states, log_exceptions
from tangostationcontrol.common.frequency_bands import bands, Band
from tangostationcontrol.common.lofar_logging import (
device_logging_to_python,
)
from tangostationcontrol.common.proxies.proxy import create_device_proxy
from tangostationcontrol.common.states import DEFAULT_COMMAND_STATES
from tangostationcontrol.devices.base_device_classes.lofar_device import LOFARDevice
from tangostationcontrol.devices.base_device_classes.mapper import (
MappedAttribute,
MappingKeys,
AntennaToSdpMapper,
RecvDeviceWalker,
)
from tangostationcontrol.devices.types import DeviceTypes
from tangostationcontrol.metrics import device_metrics
logger = logging.getLogger()
__all__ = ["AF", "AntennaUse", "AntennaStatus"]
class AntennaUse(IntEnum):
"""Enum representing the use for each antenna"""
AUTO = 0 # use antenna only if it's OK or SUSPICIOUS
ON = 1 # force antenna to be on, regardless of status
OFF = 2 # force antenna to be off, regardless of status
class AntennaStatus(IntEnum):
"""Enum representing the status of each antenna"""
OK = 0
SUSPICIOUS = 1
BROKEN = 2
BEYOND_REPAIR = 3
NOT_AVAILABLE = 4
@device_logging_to_python()
@device_metrics(
exclude=[
"FPGA_sdp_info_*",
"*_RW",
],
include=[
"ANT_mask_RW",
"Frequency_Band_RW",
],
)
class AF(LOFARDevice):
"""
AntennaField Device is an abstract Device which serves as a common base
for HBA and LBA devices, containing all the common points and
the shared logic of these two devices.
Manages the antennas in a single antenna field, by acting as a
a mapping onto one or more RECV devices.
The antenna field models a number of antennas, each of which
carries:
* a position in Antenna_Reference_ETRS/ITRF,
* a power mapping onto an RCU in Power_to_RECV_mapping,
* a control mapping onto an RCU in Control_to_RECV_mapping.
Furthermore, a central field reference position for the
antenna field is maintained in Antenna_Field_Reference_ETRS/ITRF.
For each position, it is best to provide the ETRS [x,y,z] position,
although this can be overruled through supplying an ITRF [x,y,z] position
instead. If not, the ITRF position is calculated by extrapolating the
tectonic shifts from the ETRS position, into the frame and epoch as
configured in ITRF_Reference_Frame and ITRF_Reference_Epoch.
From the ITRF positions, the geographical [lat,long] positions are
calculated, as well as the geohash.
"""
@classmethod
def get_mapped_attributes(cls, device_name) -> List[str]:
"""Return a list of the mapped attributes"""
# collect all attributes for which defaults are provided
result = []
mapped_attrs = [
name for name in dir(cls) if isinstance(getattr(cls, name), MappedAttribute)
]
for attr in mapped_attrs:
attr_instance = getattr(cls, attr)
# check the variable 'mapping_device'
if attr_instance.mapping_device == device_name:
result.append(attr)
return result
@classmethod
def get_mapped_dimensions(cls, device_name) -> Dict[str, tuple]:
"""Return a dictionary of the mapped attribute dimensions"""
mapped_dims = {}
for attr in cls.get_mapped_attributes(device_name):
attr_instance = getattr(cls, attr)
dim_x, dim_y = attr_instance.dim_x, attr_instance.dim_y
# Insert the dimension(s) in the map
mapped_dims[attr] = (dim_x,) if dim_y == 0 else (dim_y, dim_x)
return mapped_dims
# ----- Antenna names
Antenna_Names = device_property(
doc="Name of each antenna",
dtype="DevVarStringArray",
mandatory=False,
default_value=[f"Antenna{n + 1}" for n in range(MAX_ANTENNA)],
)
# ----- Antenna set
Antenna_Sets = device_property(
doc="String representation of officially offered set of antennas, "
"for use in digital beamforming.",
dtype="DevVarStringArray",
mandatory=False,
default_value=["ALL"],
)
Antenna_Set_Masks = device_property(
doc="String encoding of the corresponding antenna masks for the antennafield, "
"for use in digital beamforming. Excludes calibration antennas.",
dtype="DevVarStringArray",
mandatory=False,
default_value=["1" * MAX_ANTENNA],
)
# ----- Antenna states
Antenna_Status = device_property(
doc="Operational status state of each antenna",
dtype="DevVarUShortArray",
mandatory=False,
default_value=numpy.array(
[AntennaStatus.OK] * MAX_ANTENNA, dtype=AntennaStatus
),
)
Antenna_Use = device_property(
doc="Operational State of each antenna",
dtype="DevVarUShortArray",
mandatory=False,
default_value=numpy.array([AntennaUse.AUTO] * MAX_ANTENNA, dtype=AntennaUse),
)
# ----- Antenna properties
Antenna_Needs_Power = device_property(
doc="Whether to provide power to each antenna (False for noise sources)",
dtype="DevVarBooleanArray",
mandatory=False,
default_value=numpy.array([True] * MAX_ANTENNA),
)
Antenna_Cables = device_property(
doc=f"Which cables connect each antenna to the RCU. Both polarisations are "
f"assumed to be connected using the same type of cable. Needs to be "
f"any of ({', '.join(cable_types.keys())}).",
dtype="DevVarStringArray",
mandatory=False,
default_value=numpy.array(["0m"] * MAX_ANTENNA),
)
Field_Attenuation = device_property(
doc="Attenuation value to apply on all inputs.",
dtype="DevShort",
mandatory=False,
default_value=10,
)
# ----- Position information
Antenna_Field_Reference_ITRF = device_property(
doc="ITRF position (XYZ) of each antenna field (leave empty to auto-derive "
"from ETRS)",
dtype="DevVarFloatArray",
mandatory=False,
)
Antenna_Field_Reference_ETRS = device_property(
doc="ETRS position (XYZ) of each antenna field",
dtype="DevVarFloatArray",
mandatory=False,
)
Antenna_Reference_ITRF = device_property(
doc="ITRF position (XYZ) of each Antenna (leave empty to auto-derive from "
"ETRS)",
dtype="DevVarFloatArray",
mandatory=False,
)
Antenna_Reference_ETRS = device_property(
doc="ETRS position (XYZ) of each Antenna",
dtype="DevVarFloatArray",
mandatory=False,
)
ITRF_Reference_Frame = device_property(
doc="Reference frame in which the ITRF coordinates are provided, or are to "
"be computed from ETRS89",
dtype="DevString",
mandatory=False,
default_value="ITRF2014",
)
ITRF_Reference_Epoch = device_property(
doc="Reference epoch in which the ITRF coordinates are provided, or are to "
"be extrapolated from ETRS89. If set to 0.0, the extrapolation from ETRS"
"will be done rounded to the last half year (f.e. 2015.5 for november 2015)",
dtype="DevFloat",
mandatory=False,
default_value=0.0, # LOFAR1 used 2015.5,
)
PQR_to_ETRS_rotation_matrix = device_property(
doc="Field-specific rotation matrix to convert PQR offsets to ETRS/ITRF "
"offsets.",
dtype="DevVarFloatArray",
mandatory=False,
default_value=numpy.array(
[ # PQR->ETRS rotation matrix for the core stations
[-0.1195951054, -0.7919544517, 0.5987530018],
[0.9928227484, -0.0954186800, 0.0720990002],
[0.0000330969, 0.6030782884, 0.7976820024],
]
).flatten(),
)
# ----- SDP mapping
Antenna_to_SDP_Mapping = device_property(
dtype=(numpy.int32,),
doc="The mapping of Antennas to FPGA input pairs. Each FPGA can handle 6 "
"inputs, and SDP has 16 FPGAs. Each antenna is represented with a "
"(fpga, input) value pair. The array is flattened, so must be reshaped "
"upon use. An input=-1 means the antenna is unconnected.",
default_value=numpy.array([-1] * MAX_ANTENNA * 2, dtype=numpy.int32),
)
# ----- RECV mapping
RECV_Devices = device_property(
dtype=(str,),
doc="The set of RECVL or RECVH devices to which the Power and Control mappings refer.",
mandatory=True,
)
# NB: For HBA, the line providing the X pol carries the power,
# the line providing the Y pol carries the control.
# For LBA, both lines carry the power,
# the line providing the Y pol carries the control.
Control_to_RECV_mapping = device_property(
dtype=(numpy.int32,),
doc="The mapping of Antenna control lines to RECV mapping. Each RECV can "
"handle 96 inputs. The Antenna number is the index and the value shows "
"to which receiver device it is connected and on which input. The "
"first integer is the input. The second interger is the RECV id. "
"Example: [1, 3] = STAT/RECVH/1 with input 3. -1 means that the Antenna "
"is not connected. The property is stored in a one dimensional "
"structure. It needs to be reshaped to a list of lists of two items.",
mandatory=False,
default_value=[-1] * MAX_ANTENNA * 2,
)
# ----- Defaults
Frequency_Band_RW_default = device_property(
dtype=(str,),
mandatory=False,
)
# ----- Generic information
Antenna_Names_R = attribute(
access=AttrWriteType.READ,
dtype=(str,),
max_dim_x=MAX_ANTENNA,
)
Antenna_to_SDP_Mapping_R = attribute(
doc="To which (fpga, input) pair each antenna is connected. -1=unconnected.",
dtype=((numpy.int32,),),
max_dim_x=N_pol,
max_dim_y=MAX_ANTENNA,
)
@attribute(
access=AttrWriteType.READ,
dtype=str,
fisallowed="is_attribute_access_allowed",
)
def SDPFirmware_device_R(self):
return self.control.child(DeviceTypes.SDPFirmware).dev_name()
@attribute(
access=AttrWriteType.READ,
dtype="DevShort",
)
def Field_Attenuation_R(self):
return self.Field_Attenuation
@attribute(
access=AttrWriteType.READ,
dtype=((numpy.int32,),),
max_dim_x=N_pol,
max_dim_y=MAX_ANTENNA,
)
def Control_to_RECV_mapping_R(self):
return numpy.array(self.Control_to_RECV_mapping).reshape(-1, 2)
@attribute(
access=AttrWriteType.READ,
dtype=(str,),
max_dim_x=16,
)
def RECV_Devices_R(self):
return numpy.array(self.RECV_Devices)
@attribute(
access=AttrWriteType.READ,
dtype=((numpy.float64,),),
max_dim_x=3,
max_dim_y=3,
)
def PQR_to_ETRS_rotation_matrix_R(self):
return numpy.array(
self.PQR_to_ETRS_rotation_matrix, dtype=numpy.float64
).reshape(3, 3)
Frequency_Band_RW = attribute(
doc="The selected frequency band of each polarisation of each antenna.",
dtype=((str,),),
max_dim_x=N_pol,
max_dim_y=MAX_ANTENNA,
access=AttrWriteType.READ_WRITE,
)
# ----- Cable information (between antenna and RCU)
Antenna_Cables_R = attribute(
doc=f"Which cables connect each antenna to the RCU. Both polarisations are "
f"assumed to be connected using the same type of cable. Needs to be "
f"any of ({', '.join(cable_types.keys())}).",
dtype=(str,),
max_dim_x=MAX_ANTENNA,
)
Antenna_Cables_Delay_R = attribute(
doc="Delay caused by the cable between antenna and RCU, in seconds.",
dtype=(numpy.float64,),
max_dim_x=MAX_ANTENNA,
unit="s",
)
Antenna_Cables_Loss_R = attribute(
doc="Loss caused by the cable between antenna and RCU, in dB, for either polarisation.",
dtype=((numpy.float64,),),
max_dim_x=N_pol,
max_dim_y=MAX_ANTENNA,
unit="dB",
)
# ----- Status and usage information
Antenna_Needs_Power_R = attribute(
doc="Whether the antenna should be powered at all (False for f.e. signal generators).",
dtype=(bool,),
max_dim_x=MAX_ANTENNA,
)
Antenna_Status_R = attribute(
doc="The status of each antenna.",
dtype=(AntennaStatus,),
max_dim_x=MAX_ANTENNA,
)
Antenna_Status_int_R = attribute(
doc="The status of each antenna (as an integer). "
"0=OK, 1=SUSPICIOUS, 2=BROKEN, 3=BEYOND_REPAIR, 4=NOT_AVAILABLE.",
dtype=(numpy.uint32,),
max_dim_x=MAX_ANTENNA,
)
Antenna_Use_R = attribute(
doc="Whether each antenna should be used. 0=AUTO, 1=ON, 2=OFF. In AUTO "
"mode, the antenna is used if it is not BROKEN or BEYOND_REPAIR.",
dtype=(AntennaUse,),
max_dim_x=MAX_ANTENNA,
)
Antenna_Usage_Mask_R = attribute(
doc="Whether each antenna will be used.",
dtype=(bool,),
max_dim_x=MAX_ANTENNA,
)
Antenna_Sets_R = attribute(
doc="Officially offered set of antennas",
dtype=(str,),
max_dim_x=N_ANTENNA_SETS,
)
Antenna_Set_Masks_R = attribute(
doc="String encoding of the corresponding antenna masks",
dtype=(str,),
max_dim_x=N_ANTENNA_SETS,
)
# ----- Attributes mapped on RECV
ANT_mask_RW = MappedAttribute(
"ANT_mask_RW",
dtype=(bool,),
max_dim_x=MAX_ANTENNA,
access=AttrWriteType.READ_WRITE,
)
RCU_PWR_ANT_on_R = MappedAttribute(
"RCU_PWR_ANT_on_R", dtype=(bool,), max_dim_x=MAX_ANTENNA
)
RCU_PWR_ANT_on_RW = MappedAttribute(
"RCU_PWR_ANT_on_RW",
dtype=(bool,),
max_dim_x=MAX_ANTENNA,
access=AttrWriteType.READ_WRITE,
)
RCU_band_select_R = MappedAttribute(
"RCU_band_select_R",
dtype=((numpy.int64,),),
max_dim_x=N_pol,
max_dim_y=MAX_ANTENNA,
)
RCU_band_select_RW = MappedAttribute(
"RCU_band_select_RW",
dtype=((numpy.int64,),),
max_dim_x=N_pol,
max_dim_y=MAX_ANTENNA,
access=AttrWriteType.READ_WRITE,
)
RCU_attenuator_dB_R = MappedAttribute(
"RCU_attenuator_dB_R",
dtype=((numpy.int64,),),
max_dim_x=N_pol,
max_dim_y=MAX_ANTENNA,
abs_change="1",
)
RCU_attenuator_dB_RW = MappedAttribute(
"RCU_attenuator_dB_RW",
dtype=((numpy.int64,),),
max_dim_x=N_pol,
max_dim_y=MAX_ANTENNA,
access=AttrWriteType.READ_WRITE,
)
RCU_DTH_freq_R = MappedAttribute(
"RCU_DTH_freq_R",
dtype=(numpy.int64,),
max_dim_x=MAX_ANTENNA,
abs_change="1",
)
RCU_DTH_freq_RW = MappedAttribute(
"RCU_DTH_freq_RW",
dtype=(numpy.int64,),
max_dim_x=MAX_ANTENNA,
access=AttrWriteType.READ_WRITE,
)
RCU_DTH_on_R = MappedAttribute(
"RCU_DTH_on_R",
dtype=(bool,),
max_dim_x=MAX_ANTENNA,
)
RCU_DTH_PWR_R = MappedAttribute(
"RCU_DTH_PWR_R",
dtype=(numpy.float64,),
max_dim_x=MAX_ANTENNA,
doc="RCU Dither source power (dBm). Range -25 to -4.",
)
RCU_DTH_PWR_RW = MappedAttribute(
"RCU_DTH_PWR_RW",
dtype=(numpy.float64,),
max_dim_x=MAX_ANTENNA,
access=AttrWriteType.READ_WRITE,
doc="RCU Dither source power (dBm). Range -25 to -4.",
)
RCU_DAB_filter_on_R = MappedAttribute(
"RCU_DAB_filter_on_R",
dtype=(bool,),
max_dim_x=MAX_ANTENNA,
)
RCU_DAB_filter_on_RW = MappedAttribute(
"RCU_DAB_filter_on_RW",
dtype=(bool,),
max_dim_x=MAX_ANTENNA,
access=AttrWriteType.READ_WRITE,
)
RCU_PCB_ID_R = MappedAttribute(
"RCU_PCB_ID_R",
dtype=((numpy.int64,),),
max_dim_x=2,
max_dim_y=MAX_ANTENNA,
abs_change="1",
)
RCU_PCB_version_R = MappedAttribute(
"RCU_PCB_version_R",
dtype=((str,),),
max_dim_x=2,
max_dim_y=MAX_ANTENNA,
)
# ----- Attributes mapped on SDP
FPGA_sdp_info_observation_id_R = MappedAttribute(
"FPGA_sdp_info_observation_id_R",
dtype=(numpy.uint32,),
max_dim_x=MAX_ANTENNA,
mapping_device="SDP",
)
FPGA_sdp_info_observation_id_RW = MappedAttribute(
"FPGA_sdp_info_observation_id_RW",
dtype=(numpy.uint32,),
max_dim_x=MAX_ANTENNA,
access=AttrWriteType.READ_WRITE,
mapping_device="SDP",
)
FPGA_sdp_info_antenna_band_index_R = MappedAttribute(
"FPGA_sdp_info_antenna_band_index_R",
dtype=(numpy.uint32,),
max_dim_x=MAX_ANTENNA,
mapping_device="SDP",
)
FPGA_sdp_info_antenna_band_index_RW = MappedAttribute(
"FPGA_sdp_info_antenna_band_index_RW",
dtype=(numpy.uint32,),
max_dim_x=MAX_ANTENNA,
access=AttrWriteType.READ_WRITE,
mapping_device="SDP",
)
# ----- Position information
Antenna_Field_Reference_ITRF_R = attribute(
access=AttrWriteType.READ,
doc="Absolute reference position of antenna field, in ITRF (XYZ)",
dtype=(numpy.float64,),
max_dim_x=N_xyz,
)
Antenna_Field_Reference_GEO_R = attribute(
access=AttrWriteType.READ,
doc="Absolute reference position of antenna field, "
"in latitude/longitude (degrees)",
dtype=(numpy.float64,),
max_dim_x=N_latlong,
)
Antenna_Field_Reference_GEOHASH_R = attribute(
access=AttrWriteType.READ,
doc="Absolute reference position of antenna field, as a geohash string",
dtype=str,
)
Antenna_Reference_ITRF_R = attribute(
access=AttrWriteType.READ,
doc="Absolute reference position of each tile, in ITRF (XYZ)",
dtype=((numpy.float64,),),
max_dim_x=N_xyz,
max_dim_y=MAX_ANTENNA,
)
Antenna_Reference_GEO_R = attribute(
access=AttrWriteType.READ,
doc="Absolute reference position of each tile, in latitude/longitude (degrees)",
dtype=((numpy.float64,),),
max_dim_x=N_latlong,
max_dim_y=MAX_ANTENNA,
)
Antenna_Reference_GEOHASH_R = attribute(
access=AttrWriteType.READ,
doc="Absolute reference position of each tile, as geohash strings",
dtype=(str,),
max_dim_x=MAX_ANTENNA,
)
antenna_type_R = attribute(
doc="Type of Antennas in this field (f.e. LBA or HBA)", dtype=str
)
nr_antennas_R = attribute(doc="Number of Antennas in this field", dtype=numpy.int32)
@attribute(
doc="Epoch used to extrapolate the ITRF model to.",
dtype=numpy.float64,
unit="years",
)
def ITRF_Reference_Epoch_R(self):
if self.ITRF_Reference_Epoch > 2000.0:
return self.ITRF_Reference_Epoch
# return current year as a float, rounded
# down to half a year.
now = datetime.datetime.now()
if now.month <= 6:
return numpy.float64(now.year + 0.0)
else:
return numpy.float64(now.year + 0.5)
def __init__(self, cl, name):
self.sdpfirmware_proxy = None
self.sdp_proxy = None
self.recv_proxies = []
# Super must be called after variable assignment due to executing init_device!
super().__init__(cl, name)
@property
def nr_antennas(self):
# The number of antennas should be equal to:
# * the number of elements in the Control_to_RECV_mapping (after reshaping),
# * the number of elements in the Power_to_RECV_mapping (after reshaping),
# * the number of antennas exposed through Antenna_Reference_ITRF_R.
# * the number of elements in Antenna_Use
# * the number of elements in Antenna_Status
# * the number of elements in Antenna_to_SDP_Mapping
#
# Parsing a property here is quickest, so we chose that.
return len(self.Control_to_RECV_mapping) // 2
def read_Antenna_Names_R(self):
antenna_names = numpy.array(self.Antenna_Names)[: self.nr_antennas]
return antenna_names
@command(dtype_in=str, dtype_out=DevVarBooleanArray)
def antenna_set_to_mask(self, antenna_set):
"""Translate the antenna set string code into the corresponding
antenna mask array"""
return antenna_set_to_mask(
antenna_set,
self.nr_antennas,
self.Antenna_Sets,
self.Antenna_Set_Masks,
)
def read_Frequency_Band_RW(self):
antenna_type = self.ANTENNA_TYPE
# fetch settings from RECV, use X polarisation for reference
rcu_band_select = self.read_attribute("RCU_band_select_RW")
# fetch settings from SDP
clock = self.sdpfirmware_proxy.clock_RW
def lookup_band(rcu_band):
try:
return Band.lookup_rcu_band(antenna_type, clock, rcu_band)
except ValueError:
# unknown configuration
return f"<unsupported: {antenna_type=} {clock=} {rcu_band=}>"
# compute frequency band for each antenna
return numpy.vectorize(lookup_band)(rcu_band_select)
def write_Frequency_Band_RW(self, value):
# use numpy for easy processing
value = numpy.array(value)
# validate content
for val in value.flatten():
if val not in bands:
raise ValueError(
f"Unsupported frequency band: {val}. Must be one of {list(bands)}."
)
if bands[val].antenna_type != self.ANTENNA_TYPE:
raise ValueError(
f"Unsupported frequency band for our antenna type: {val} \
is for {bands[val].antenna_type}, but we are {self.ANTENNA_TYPE}."
)
if (
bands[val].clock != bands[value[0, 0]].clock
): # NB: "value[0,0] in bands" holds at this point
raise ValueError(
f"All frequency bands must use the same clock. \
These do not: {val} and {value[0, 0]}."
)
# convert into settings for RECV
self.proxy.RCU_band_select_RW = numpy.vectorize(
lambda band: bands[band].rcu_band
)(value)
# apply settings on SDP
self.sdpfirmware_proxy.clock_RW = bands[value[0, 0]].clock
# read-modify-write on [fpga][(input, polarisation)]
sdp_nyquist_zone = numpy.full((N_pn, A_pn * N_pol), None)
for antenna_nr, (fpga_nr, input_nr) in enumerate(
self.read_attribute("Antenna_to_SDP_Mapping_R")
):
if input_nr == -1:
continue
# set for x polarisation
sdp_nyquist_zone[fpga_nr, input_nr * 2 + 0] = bands[
value[antenna_nr, 0]
].nyquist_zone
# set for y polarisation
sdp_nyquist_zone[fpga_nr, input_nr * 2 + 1] = bands[
value[antenna_nr, 1]
].nyquist_zone
self.atomic_read_modify_write_attribute(
sdp_nyquist_zone, self.sdp_proxy, "nyquist_zone_RW", None, numpy.uint32
)
def read_Antenna_Cables_R(self):
antenna_cables = numpy.array(self.Antenna_Cables)[: self.nr_antennas]
return antenna_cables
def read_Antenna_Cables_Delay_R(self):
return numpy.array(
[cable_types[antenna].delay for antenna in self.read_Antenna_Cables_R()]
)
def read_Antenna_Cables_Loss_R(self):
rcu_bands = self.read_attribute("RCU_band_select_RW")
control_to_recv_mapping = numpy.array(self.Control_to_RECV_mapping).reshape(
-1, 2
)
recvs = control_to_recv_mapping[:, 0] # first column is RECV device number
# Unconnected antennas return RCU band 0, which does not exist.
# Return 0 loss for them instead.
return numpy.array(
[
(
[
cable_types[cable].get_loss(self.ANTENNA_TYPE, rcu_band[0]),
cable_types[cable].get_loss(self.ANTENNA_TYPE, rcu_band[1]),
]
if recv > 0
else [0, 0]
)
for recv, cable, rcu_band in zip(recvs, self.Antenna_Cables, rcu_bands)
]
)
def read_Antenna_Needs_Power_R(self):
antenna_needs_power = numpy.array(self.Antenna_Needs_Power)[: self.nr_antennas]
return antenna_needs_power
def read_Antenna_Use_R(self):
antenna_use = numpy.array(self.Antenna_Use)[: self.nr_antennas]
return numpy.array([AntennaUse(x) for x in antenna_use], dtype=AntennaUse)
def read_Antenna_Status_R(self):
antenna_status = numpy.array(self.Antenna_Status)[: self.nr_antennas]
return numpy.array(
[AntennaStatus(x) for x in antenna_status], dtype=AntennaStatus
)
def read_Antenna_Status_int_R(self):
return numpy.array(
[x.value for x in self.read_Antenna_Status_R()], dtype=numpy.uint32
)
def read_Antenna_Usage_Mask_R(self):
use = self.read_Antenna_Use_R()
status = self.read_Antenna_Status_R()
antennas_forced_on = use == AntennaUse.ON
antennas_auto_on = numpy.logical_and(
use == AntennaUse.AUTO, status <= AntennaStatus.SUSPICIOUS
)
return numpy.logical_or(antennas_forced_on, antennas_auto_on)
def read_Antenna_Sets_R(self):
return self.Antenna_Sets
def read_Antenna_Set_Masks_R(self):
return self.Antenna_Set_Masks
def read_Antenna_to_SDP_Mapping_R(self):
return numpy.array(self.Antenna_to_SDP_Mapping).reshape(-1, 2)
def read_nr_antennas_R(self):
return self.nr_antennas
def read_antenna_type_R(self):
return self.ANTENNA_TYPE
def read_Antenna_Field_Reference_ITRF_R(self):
# provide ITRF field coordinates if they were configured
if self.Antenna_Field_Reference_ITRF:
return numpy.array(self.Antenna_Field_Reference_ITRF).reshape(N_xyz)
# calculate them from ETRS coordinates if not, using the configured ITRF
# reference
etrs_coordinates = numpy.array(self.Antenna_Field_Reference_ETRS).reshape(N_xyz)
epoch = self.read_attribute("ITRF_Reference_Epoch_R")
return ETRS_to_ITRF(etrs_coordinates, self.ITRF_Reference_Frame, epoch)
def read_Antenna_Field_Reference_GEO_R(self):
return ITRF_to_GEO(self.read_Antenna_Field_Reference_ITRF_R())
def read_Antenna_Field_Reference_GEOHASH_R(self):
return GEO_to_GEOHASH(self.read_Antenna_Field_Reference_GEO_R())
def read_Antenna_Reference_ITRF_R(self):
# provide ITRF coordinates if they were configured
if self.Antenna_Reference_ITRF:
return numpy.array(self.Antenna_Reference_ITRF).reshape(
self.nr_antennas, N_xyz
)
# calculate them from ETRS coordinates if not, using the configured ITRF
# reference
etrs_coordinates = numpy.array(self.Antenna_Reference_ETRS).reshape(
self.nr_antennas, N_xyz
)
epoch = self.read_attribute("ITRF_Reference_Epoch_R")
return ETRS_to_ITRF(etrs_coordinates, self.ITRF_Reference_Frame, epoch)
def read_Antenna_Reference_GEO_R(self):
return ITRF_to_GEO(self.read_Antenna_Reference_ITRF_R())
def read_Antenna_Reference_GEOHASH_R(self):
return GEO_to_GEOHASH(self.read_Antenna_Reference_GEO_R())
def _setup_all_proxies(self):
self.recv_proxies.clear()
for recv in self.RECV_Devices:
self.recv_proxies.append(create_device_proxy(recv, write_access=True))
self.sdpfirmware_proxy = self.control.child(DeviceTypes.SDPFirmware)
self.sdp_proxy = self.control.child(self.sdpfirmware_proxy.SDP_device_R)
def _setup_recv_mapper(self):
# Reshape of mapping is needed because properties are stored in 1d arrays
recv_mapping = {
MappingKeys.CONTROL: numpy.reshape(self.Control_to_RECV_mapping, (-1, 2)),
MappingKeys.POWER: numpy.reshape(self.Power_to_RECV_mapping, (-1, 2)),
}
self._recv_mapper = self._get_recv_mapper(recv_mapping, len(self.recv_proxies))
def _get_recv_mapper(self, recv_mapping, nr_recv_devices):
"""Provide the RECV mapper based on the antenna type"""
raise NotImplementedError
def _setup_sdp_mapper(self):
# Reshape of mapping is needed because properties are stored in 1d arrays
fpga_sdp_mapping = numpy.reshape(self.Antenna_to_SDP_Mapping, (-1, 2))
self.__sdp_mapper = AntennaToSdpMapper(
{MappingKeys.FPGA: fpga_sdp_mapping},
self.get_mapped_dimensions("SDP"),
)
def get_mapped_attribute(self, mapped_point: str, mapped_device: str):
"""Read method implementation of the MappedAttribute class"""
# SDP
if mapped_device == "SDP":
sdp_results = self.sdp_proxy.read_attribute(mapped_point).value
mapped_values = self.__sdp_mapper.map_read(mapped_point, sdp_results)
return mapped_values
# RECV
recv_results = []
for recv_proxy in self.recv_proxies:
result = recv_proxy.read_attribute(mapped_point).value
recv_results.append(result)
mapped_values = self._recv_mapper.map_read(mapped_point, recv_results)
return mapped_values
def set_mapped_attribute(
self, mapped_point: str, value, cast_type: type, mapped_device: str
):
"""Set the attribute to new value only for controlled points
:warning: This method is susceptible to a lost update race condition if the
attribute on the RECVH/RECVL device is written to in between `read_attribute`
and `write_attribute`!
"""
if mapped_device == "SDP":
# returns sparse multidimensional array, uncontrolled values set to None
mapped_value = self.__sdp_mapper.map_write(mapped_point, value)
self.atomic_read_modify_write_attribute(
mapped_value, self.sdp_proxy, mapped_point, cast_type=cast_type
)
else:
# returns sparse multidimensional array, uncontrolled values set to None
mapped_value = self._recv_mapper.map_write(mapped_point, value)
for idx, recv_proxy in enumerate(self.recv_proxies):
new_values = mapped_value[idx]
self.atomic_read_modify_write_attribute(
new_values, recv_proxy, mapped_point, cast_type=cast_type
)
# --------
# Overloaded functions
# --------
@log_exceptions()
def configure_for_initialise(self):
super().configure_for_initialise()
self._setup_all_proxies()
self._setup_recv_mapper()
self._setup_sdp_mapper()
def _read_hardware_powered_fraction_R(self):
"""Read attribute which monitors the power"""
mask = self.read_attribute("ANT_mask_RW")
powered = self.read_attribute("RCU_PWR_ANT_on_R")
try:
return numpy.count_nonzero(powered & mask) / numpy.count_nonzero(mask)
except ZeroDivisionError:
return 1.0
def _power_hardware_on(self):
# Configure the devices that process our antennas
self.configure_recv()
self.configure_sdp()
def power_antennas_off(self, mask: numpy.ndarray):
# Power the specified antennas off, do not touch other antennas.
#
# Mask: a boolean array indicating which antennas should be powered OFF
# Save actual mask values
ANT_mask_RW = self.read_attribute("ANT_mask_RW")
# Enable controlling all requested antennas
self.proxy.write_attribute("ANT_mask_RW", mask)
try:
# make sure we update RCU_PWR_ANT_on_RW neatly with our mask
RCU_PWR_ANT_on_RW = numpy.logical_and(
self.read_attribute("RCU_PWR_ANT_on_RW"), numpy.logical_not(mask)
)
# Turn off power to all antennas (in the requested mask)
self.proxy.write_attribute("RCU_PWR_ANT_on_RW", RCU_PWR_ANT_on_RW)
finally:
# Restore original mask
self.proxy.write_attribute("ANT_mask_RW", ANT_mask_RW)
def _power_hardware_off(self):
# Power all antennas off
self.power_antennas_off([True] * self.nr_antennas)
# --------
# Commands
# --------
@command()
@only_in_states(DEFAULT_COMMAND_STATES)
@DebugIt()
@log_exceptions()
def configure_recv(self):
"""Configure RECV to process our antennas."""
# Power off what should be off, f.e. if they got turned off in the mask
# but are still powered.
self.power_antennas_off(
numpy.logical_not(self.read_attribute("Antenna_Usage_Mask_R"))
)
# Disable controlling the antennas that fall outside the mask
self.proxy.write_attribute(
"ANT_mask_RW", self.read_attribute("Antenna_Usage_Mask_R")
)
# Turn on power to antennas that we use, and that need it
self.proxy.write_attribute(
"RCU_PWR_ANT_on_RW",
numpy.logical_and(
self.read_attribute("Antenna_Needs_Power_R"),
self.read_attribute("Antenna_Usage_Mask_R"),
),
)
@command()
@only_in_states(DEFAULT_COMMAND_STATES)
@DebugIt()
@log_exceptions()
def configure_sdp(self):
"""Configure SDP to process our antennas."""
@command()
@DebugIt()
@only_in_states(DEFAULT_COMMAND_STATES)
def RCU_DTH_setup(self):
walker = RecvDeviceWalker(
self.read_attribute("Control_to_RECV_mapping_R"),
self.read_attribute("Antenna_Usage_Mask_R"),
)
walker.walk_receivers(self.recv_proxies, lambda recv: recv.RCU_DTH_setup())
@command()
@DebugIt()
@only_in_states(DEFAULT_COMMAND_STATES)
def RCU_DTH_on(self):
walker = RecvDeviceWalker(
self.read_attribute("Control_to_RECV_mapping_R"),
self.read_attribute("Antenna_Usage_Mask_R"),
)
walker.walk_receivers(self.recv_proxies, lambda recv: recv.RCU_DTH_on())
@command()
@DebugIt()
@only_in_states(DEFAULT_COMMAND_STATES)
def RCU_DTH_off(self):
walker = RecvDeviceWalker(
self.read_attribute("Control_to_RECV_mapping_R"),
self.read_attribute("Antenna_Usage_Mask_R"),
)
walker.walk_receivers(self.recv_proxies, lambda recv: recv.RCU_DTH_off())