Newer
Older
# Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
""" RECV Abstract Device Server for LOFAR2.0

Hannes Feldt
committed
import logging

Hannes Feldt
committed
import numpy
from attribute_wrapper.attribute_wrapper import AttributeWrapper
from tango import AttrWriteType, DevString, DevLong

Hannes Feldt
committed
# PyTango imports
from tango import DebugIt
from tango.server import command, device_property, attribute

Jan David Mol
committed
# Additional import

Hannes Feldt
committed
from tangostationcontrol.common.constants import (
N_rcu,
N_rcu_inp,
)
from tangostationcontrol.common.frequency_bands import bands
from tangostationcontrol.common.lofar_logging import device_logging_to_python
from tangostationcontrol.common.states import DEFAULT_COMMAND_STATES
from tangostationcontrol.devices.base_device_classes.opcua_device import OPCUADevice
from tangostationcontrol.common.device_decorators import only_in_states
from tangostationcontrol.metrics import device_metrics
__all__ = ["RECVDevice"]
Two types of receiver: RECVL and RECVH

Jan David Mol
committed
@device_logging_to_python()
@device_metrics(
exclude=[
"*_RW",
class RECVDevice(OPCUADevice):
"""
RECV Device is an abstract Device which serves as a common base
for RECVL and RECVH devices, containing all the common points and
the shared logic of these two devices.
"""
# -----------------
# Device Properties
# -----------------

Jan David Mol
committed
# ----- Default settings

Jan David Mol
committed
ANT_mask_RW_default = device_property(

Hannes Feldt
committed
dtype="DevVarBooleanArray",

Hannes Feldt
committed
default_value=[True] * N_rcu * N_rcu_inp,
RECVTR_monitor_rate_RW_default = device_property(
dtype="DevLong64", mandatory=False, default_value=1
)
RCU_mask_RW_default = device_property(

Hannes Feldt
committed
dtype="DevVarBooleanArray", mandatory=False, default_value=[True] * N_rcu
RCU_attenuator_dB_RW_default = device_property(

Hannes Feldt
committed
dtype="DevVarLong64Array",

Hannes Feldt
committed
default_value=[0] * N_rcu * N_rcu_inp,

Jan David Mol
committed
RCU_band_select_RW_default = device_property(

Hannes Feldt
committed
dtype="DevVarLong64Array",

Jan David Mol
committed
mandatory=False,

Hannes Feldt
committed
default_value=[1] * N_rcu * N_rcu_inp,

Jan David Mol
committed
)
RCU_PWR_ANT_on_RW_default = device_property(

Hannes Feldt
committed
dtype="DevVarBooleanArray",

Jan David Mol
committed
mandatory=False,
# turn power off by default in test setups, f.e. to prevent blowing up the noise sources

Jan David Mol
committed
)
TRANSLATOR_DEFAULT_SETTINGS = [

Hannes Feldt
committed
"ANT_mask_RW",
"RCU_mask_RW",
"RECVTR_monitor_rate_RW",

Jan David Mol
committed
]

Jan David Mol
committed
# ----- Timing values
RCU_On_Off_timeout = device_property(

Hannes Feldt
committed
doc="Maximum amount of time to wait after turning RCU(s) on or off",
dtype="DevFloat",

Jan David Mol
committed
mandatory=False,

Jan David Mol
committed
)
RCU_DTH_On_Off_timeout = device_property(

Hannes Feldt
committed
doc="Maximum amount of time to wait after turning dithering on or off",
dtype="DevFloat",

Jan David Mol
committed
mandatory=False,

Hannes Feldt
committed
default_value=30.0,

Jan David Mol
committed
)

Jan David Mol
committed
TR_software_version_R = AttributeWrapper(
comms_annotation=["TR_software_version_R"], datatype=str
)
ANT_mask_RW = AttributeWrapper(
doc="Which antennas are physically connected.",

Hannes Feldt
committed
comms_annotation=["ANT_mask_RW"],
datatype=bool,
dims=(N_rcu, N_rcu_inp),
access=AttrWriteType.READ_WRITE,
RECVTR_monitor_rate_RW = AttributeWrapper(
comms_annotation=["RECVTR_monitor_rate_RW"],

Hannes Feldt
committed
datatype=numpy.int64,
access=AttrWriteType.READ_WRITE,
RCU_mask_RW = AttributeWrapper(
comms_annotation=["RCU_mask_RW"],

Hannes Feldt
committed
datatype=bool,

Hannes Feldt
committed
access=AttrWriteType.READ_WRITE,
RECVTR_I2C_error_R = AttributeWrapper(
comms_annotation=["RECVTR_I2C_error_R"], datatype=numpy.int64, dims=(N_rcu,)
RECVTR_translator_busy_R = AttributeWrapper(
comms_annotation=["RECVTR_translator_busy_R"], datatype=bool
RCU_attenuator_dB_R = AttributeWrapper(

Hannes Feldt
committed
comms_annotation=["RCU_attenuator_dB_R"],
datatype=numpy.int64,
dims=(N_rcu, N_rcu_inp),
)
RCU_attenuator_dB_RW = AttributeWrapper(

Hannes Feldt
committed
comms_annotation=["RCU_attenuator_dB_RW"],
datatype=numpy.int64,
dims=(N_rcu, N_rcu_inp),
access=AttrWriteType.READ_WRITE,
RCU_band_select_R = AttributeWrapper(

Hannes Feldt
committed
comms_annotation=["RCU_band_select_R"],
datatype=numpy.int64,
dims=(N_rcu, N_rcu_inp),
)
RCU_band_select_RW = AttributeWrapper(

Hannes Feldt
committed
comms_annotation=["RCU_band_select_RW"],
datatype=numpy.int64,
dims=(N_rcu, N_rcu_inp),
access=AttrWriteType.READ_WRITE,
RCU_LED_red_on_R = AttributeWrapper(

Hannes Feldt
committed
comms_annotation=["RCU_LED_red_on_R"], datatype=bool, dims=(N_rcu,)
)
RCU_LED_red_on_RW = AttributeWrapper(

Hannes Feldt
committed
comms_annotation=["RCU_LED_red_on_RW"],
datatype=bool,
dims=(N_rcu,),
access=AttrWriteType.READ_WRITE,
RCU_LED_green_on_R = AttributeWrapper(
comms_annotation=["RCU_LED_green_on_R"], datatype=bool, dims=(N_rcu,)
)
RCU_LED_green_on_RW = AttributeWrapper(
comms_annotation=["RCU_LED_green_on_RW"],

Hannes Feldt
committed
datatype=bool,
dims=(N_rcu,),
access=AttrWriteType.READ_WRITE,
)
RCU_TEMP_R = AttributeWrapper(
comms_annotation=["RCU_TEMP_R"], datatype=numpy.float64, dims=(N_rcu,)

Hannes Feldt
committed
)
RCU_PWR_3V3_R = AttributeWrapper(
comms_annotation=["RCU_PWR_3V3_R"], datatype=numpy.float64, dims=(N_rcu,)

Hannes Feldt
committed
)

Hannes Feldt
committed
RCU_PWR_1V8_R = AttributeWrapper(
comms_annotation=["RCU_PWR_1V8_R"], datatype=numpy.float64, dims=(N_rcu,)
)

Hannes Feldt
committed
RCU_PWR_2V5_R = AttributeWrapper(
comms_annotation=["RCU_PWR_2V5_R"], datatype=numpy.float64, dims=(N_rcu,)
)
RCU_PWR_ANT_VOUT_R = AttributeWrapper(
comms_annotation=["RCU_PWR_ANT_VOUT_R"],
datatype=numpy.float64,
dims=(N_rcu, N_rcu_inp),

Hannes Feldt
committed
)
RCU_PWR_ANT_VIN_R = AttributeWrapper(
comms_annotation=["RCU_PWR_ANT_VIN_R"],
datatype=numpy.float64,
dims=(N_rcu, N_rcu_inp),

Hannes Feldt
committed
)

Hannes Feldt
committed
RCU_PWR_ANT_IOUT_R = AttributeWrapper(
comms_annotation=["RCU_PWR_ANT_IOUT_R"],
datatype=numpy.float64,
dims=(N_rcu, N_rcu_inp),
)
RCU_PWR_DIGITAL_on_R = AttributeWrapper(
comms_annotation=["RCU_PWR_DIGITAL_on_R"], datatype=bool, dims=(N_rcu,)
)
RCU_PWR_good_R = AttributeWrapper(
comms_annotation=["RCU_PWR_good_R"], datatype=bool, dims=(N_rcu,)
)
RCU_PWR_ANALOG_on_R = AttributeWrapper(
comms_annotation=["RCU_PWR_ANALOG_on_R"], datatype=bool, dims=(N_rcu,)
)

Hannes Feldt
committed
RCU_PWR_ANT_on_R = AttributeWrapper(
comms_annotation=["RCU_PWR_ANT_on_R"], datatype=bool, dims=(N_rcu, N_rcu_inp)
)
RCU_PWR_ANT_on_RW = AttributeWrapper(
comms_annotation=["RCU_PWR_ANT_on_RW"],
datatype=bool,
dims=(N_rcu, N_rcu_inp),
access=AttrWriteType.READ_WRITE,
)
RCU_PCB_ID_R = AttributeWrapper(
comms_annotation=["RCU_PCB_ID_R"], datatype=numpy.int64, dims=(N_rcu,)

Hannes Feldt
committed
)
RCU_PCB_version_R = AttributeWrapper(
comms_annotation=["RCU_PCB_version_R"], datatype=str, dims=(N_rcu,)

Hannes Feldt
committed
)
RCU_PCB_number_R = AttributeWrapper(
comms_annotation=["RCU_PCB_number_R"], datatype=str, dims=(N_rcu,)

Hannes Feldt
committed
)
RCU_ADC_locked_R = AttributeWrapper(
comms_annotation=["RCU_ADC_locked_R"], datatype=bool, dims=(N_rcu, N_rcu_inp)

Hannes Feldt
committed
)
RCU_DTH_freq_R = AttributeWrapper(
comms_annotation=["RCU_DTH_freq_R"],
datatype=numpy.int64,
dims=(N_rcu, N_rcu_inp),

Hannes Feldt
committed
)
RCU_DTH_freq_RW = AttributeWrapper(
comms_annotation=["RCU_DTH_freq_RW"],

Hannes Feldt
committed
datatype=numpy.int64,

Hannes Feldt
committed
access=AttrWriteType.READ_WRITE,
)
RCU_DTH_PWR_R = AttributeWrapper(
comms_annotation=["RCU_DTH_PWR_R"],
datatype=numpy.float64,
dims=(N_rcu, N_rcu_inp),
doc="RCU Dither source power (dBm). Range -25 to -4.",
RCU_DTH_PWR_RW = AttributeWrapper(
comms_annotation=["RCU_DTH_PWR_RW"],
datatype=numpy.float64,
access=AttrWriteType.READ_WRITE,
doc="RCU Dither source power (dBm). Range -25 to -4.",
)
RCU_DTH_on_R = AttributeWrapper(
comms_annotation=["RCU_DTH_on_R"], datatype=bool, dims=(N_rcu, N_rcu_inp)
)

Jan David Mol
committed
# ----------
# Summarising Attributes
# ----------

Hannes Feldt
committed
RCU_LED_colour_R = attribute(
dtype=(numpy.uint32,), max_dim_x=N_rcu, fisallowed="is_attribute_access_allowed"
)

Jan David Mol
committed

Hannes Feldt
committed
ANT_error_R = attribute(
dtype=((bool,),),
max_dim_y=N_rcu,
max_dim_x=N_rcu_inp,
fisallowed="is_attribute_access_allowed",
)

Jan David Mol
committed
RCU_error_R = attribute(
dtype=(bool,), max_dim_x=N_rcu, fisallowed="is_attribute_access_allowed"
)

Jan David Mol
committed

Hannes Feldt
committed
RECV_IOUT_error_R = attribute(
dtype=((bool,),),
max_dim_y=N_rcu,
max_dim_x=N_rcu_inp,
fisallowed="is_attribute_access_allowed",
)
RECV_TEMP_error_R = attribute(
dtype=(bool,),
max_dim_x=N_rcu,
fisallowed="is_attribute_access_allowed",
)
RECV_VOUT_error_R = attribute(
dtype=(bool,), max_dim_x=N_rcu, fisallowed="is_attribute_access_allowed"
)
def read_RCU_LED_colour_R(self):
return (
2 * self.read_attribute("RCU_LED_green_on_R")
+ 4 * self.read_attribute("RCU_LED_red_on_R")
).astype(numpy.uint32)
def read_RECV_TEMP_error_R(self):

Jan David Mol
committed
# Don't apply the mask here --- we always want to know if things get too hot!

Hannes Feldt
committed
return self.alarm_val("RCU_TEMP_R")
def read_ANT_error_R(self):
return self.read_attribute("ANT_mask_RW") & (
~self.read_attribute("RCU_ADC_locked_R")
)
def read_RCU_error_R(self):
return self.read_attribute("RCU_mask_RW") & (
(self.read_attribute("RECVTR_I2C_error_R") > 0)
| self.alarm_val("RCU_PCB_ID_R")
)
def read_RECV_IOUT_error_R(self):
return self.read_attribute("ANT_mask_RW") & (
self.alarm_val("RCU_PWR_ANT_IOUT_R")
)
def read_RECV_VOUT_error_R(self):

Hannes Feldt
committed
return (
self.read_attribute("ANT_mask_RW")
& (
self.alarm_val("RCU_PWR_ANT_VIN_R")
| self.alarm_val("RCU_PWR_ANT_VOUT_R")

Hannes Feldt
committed
)
).any(axis=1) | (
self.read_attribute("RCU_mask_RW")
& (
self.alarm_val("RCU_PWR_1V8_R")
| self.alarm_val("RCU_PWR_2V5_R")
| self.alarm_val("RCU_PWR_3V3_R")
| ~self.read_attribute("RCU_PWR_DIGITAL_on_R")
| ~self.read_attribute("RCU_PWR_good_R")

Hannes Feldt
committed
)
)
# --------
# overloaded functions
# --------

Jan David Mol
committed
def _read_hardware_powered_fraction_R(self):
"""Read attribute which monitors the power"""
mask = self.read_attribute("RCU_mask_RW")
powered = self.read_attribute("RCU_PWR_good_R")
try:
return numpy.count_nonzero(powered & mask) / numpy.count_nonzero(mask)
except ZeroDivisionError:
return 1.0

Jan David Mol
committed
def _power_hardware_on(self):
"""Power the RCUs."""
self.RCU_on()
self.wait_attribute("RECVTR_translator_busy_R", False, self.RCU_On_Off_timeout)
# NB: Powering on RCUs causes DTH to be turned off, which is what we want
# to create a steady baseline after powerup. This is done by RECVTR
# even if the RCUs are already powered on.
# Let dithering be configured in the background since other setup
# does not depend on it.
# (do not wait for RECVTR_translator_busy_R == False)

Jan David Mol
committed
def _power_hardware_off(self):
"""Turns off the RCUs."""
# Set the mask to all Trues
self.RCU_mask_RW = [True] * N_rcu
# Turn off the RCUs
self.RCU_off()
self.wait_attribute("RECVTR_translator_busy_R", False, self.RCU_On_Off_timeout)
# Restore the mask
@only_in_states(DEFAULT_COMMAND_STATES)
self.opcua_connection.call_method(["RCU_off"])
@only_in_states(DEFAULT_COMMAND_STATES)
self.opcua_connection.call_method(["RCU_on"])
@only_in_states(DEFAULT_COMMAND_STATES)
def RCU_DTH_off(self):
self.opcua_connection.call_method(["RCU_DTH_off"])
@only_in_states(DEFAULT_COMMAND_STATES)
def RCU_DTH_on(self):
self.opcua_connection.call_method(["RCU_DTH_on"])
@command()
@DebugIt()
@only_in_states(DEFAULT_COMMAND_STATES)
def RCU_DTH_setup(self):
"""
Configure dither power (RCU_DTH_PWR) and unique frequency (RCU_DTH_freq)
for each signal chain
:return:None
"""
self.opcua_connection.call_method(["RCU_DTH_setup"])
@command(dtype_in=DevString, dtype_out=DevLong)
def get_rcu_band_from_filter(self, filter_name: str):
"""return the rcu band given the filter name"""
return bands[filter_name].rcu_band