Select Git revision
-
Corné Lukken authoredCorné Lukken authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
recv.py 15.94 KiB
# -*- coding: utf-8 -*-
#
# This file is part of the RECV project
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
""" RECV Device Server for LOFAR2.0
"""
# PyTango imports
from tango import DebugIt
from tango.server import command
from tango.server import device_property, attribute
from tango import AttrWriteType, DevVarFloatArray, DevString, DevLong
import numpy
# Additional import
from tangostationcontrol.common.entrypoint import entry
from tangostationcontrol.common.lofar_logging import device_logging_to_python
from tangostationcontrol.clients.attribute_wrapper import attribute_wrapper
from tangostationcontrol.devices.device_decorators import only_in_states
from tangostationcontrol.devices.opcua_device import opcua_device
from tangostationcontrol.devices.lofar_device import lofar_device
import logging
logger = logging.getLogger()
__all__ = ["RECV", "main"]
@device_logging_to_python()
class RECV(opcua_device):
FILTER_RCU_DICT = {
"LBA_10_90": 1,
"LBA_10_70": 1,
"LBA_30_90": 2,
"LBA_30_70": 2,
"HBA_170_230": 1,
"HBA_110_190": 2,
"HBA_210_250": 4
}
# -----------------
# Device Properties
# -----------------
# ----- Default settings
ANT_mask_RW_default = device_property(
dtype='DevVarBooleanArray',
mandatory=False,
default_value=[[True] * 3] * 32
)
RCU_mask_RW_default = device_property(
dtype='DevVarBooleanArray',
mandatory=False,
default_value=[True] * 32
)
RCU_band_select_RW_default = device_property(
dtype='DevVarLong64Array',
mandatory=False,
default_value=[[0] * 3] * 32
)
RCU_PWR_ANT_on_RW = device_property(
dtype='DevVarLong64Array',
mandatory=False,
default_value=[[False] * 3] * 32 # turn power off by default in test setups, f.e. to prevent blowing up the noise sources
)
RECVTR_monitor_rate_RW_default = device_property(
dtype='DevLong64',
mandatory=False,
default_value=1
)
TRANSLATOR_DEFAULT_SETTINGS = [
'ANT_mask_RW',
'RCU_mask_RW'
]
# ----- Timing values
RCU_On_Off_timeout = device_property(
doc='Maximum amount of time to wait after turning RCU(s) on or off',
dtype='DevFloat',
mandatory=False,
default_value=30.0
)
RCU_DTH_On_Off_timeout = device_property(
doc='Maximum amount of time to wait after turning dithering on or off',
dtype='DevFloat',
mandatory=False,
default_value=30.0
)
# ----- Calibration values
HBAT_bf_delay_step_delays = device_property(
dtype="DevVarFloatArray",
mandatory=False,
default_value=numpy.array([
0.0, 0.5228E-9, 0.9797E-9, 1.4277E-9, 1.9055E-9,
2.4616E-9, 2.9539E-9, 3.4016E-9, 3.8076E-9, 4.3461E-9,
4.9876E-9, 5.4894E-9, 5.7973E-9, 6.2707E-9, 6.8628E-9,
7.3989E-9, 8.0673E-9, 8.6188E-9, 9.1039E-9, 9.5686E-9,
10.0463E-9, 10.5774E-9, 11.0509E-9, 11.5289E-9, 11.9374E-9,
12.4524E-9, 13.0842E-9, 13.5936E-9, 13.9198E-9, 14.4087E-9,
14.9781E-9, 15.5063E-9
],dtype=numpy.float64)
)
HBAT_signal_input_delays = device_property(
doc='Signal input delay calibration values for the elements within a tile.',
dtype='DevVarFloatArray',
mandatory=False,
default_value = numpy.zeros((32,), dtype=numpy.float64)
)
# ----------
# Attributes
# ----------
ANT_mask_RW = attribute_wrapper(comms_annotation=["ANT_mask_RW" ],datatype=bool , dims=(3,32), access=AttrWriteType.READ_WRITE)
# The HBAT beamformer delays represent 32 delays for each of the 96 inputs.
# The 32 delays deconstruct as delays[polarisation][dipole], and each delay is the number of 'delay steps' to apply (0.5ns for HBAT1).
HBAT_BF_delay_steps_R = attribute_wrapper(comms_annotation=["HBAT_BF_delay_steps_R" ],datatype=numpy.int64 , dims=(32,96))
HBAT_BF_delay_steps_RW = attribute_wrapper(comms_annotation=["HBAT_BF_delay_steps_RW" ],datatype=numpy.int64 , dims=(32,96), access=AttrWriteType.READ_WRITE)
HBAT_LED_on_R = attribute_wrapper(comms_annotation=["HBAT_LED_on_R" ],datatype=bool , dims=(32,96))
HBAT_LED_on_RW = attribute_wrapper(comms_annotation=["HBAT_LED_on_RW" ],datatype=bool , dims=(32,96), access=AttrWriteType.READ_WRITE)
HBAT_PWR_LNA_on_R = attribute_wrapper(comms_annotation=["HBAT_PWR_LNA_on_R" ],datatype=bool , dims=(32,96))
HBAT_PWR_LNA_on_RW = attribute_wrapper(comms_annotation=["HBAT_PWR_LNA_on_RW" ],datatype=bool , dims=(32,96), access=AttrWriteType.READ_WRITE)
HBAT_PWR_on_R = attribute_wrapper(comms_annotation=["HBAT_PWR_on_R" ],datatype=bool , dims=(32,96))
HBAT_PWR_on_RW = attribute_wrapper(comms_annotation=["HBAT_PWR_on_RW" ],datatype=bool , dims=(32,96), access=AttrWriteType.READ_WRITE)
RCU_ADC_locked_R = attribute_wrapper(comms_annotation=["RCU_ADC_locked_R" ],datatype=bool , dims=(3,32))
RCU_attenuator_dB_R = attribute_wrapper(comms_annotation=["RCU_attenuator_dB_R" ],datatype=numpy.int64 , dims=(3,32))
RCU_attenuator_dB_RW = attribute_wrapper(comms_annotation=["RCU_attenuator_dB_RW" ],datatype=numpy.int64 , dims=(3,32), access=AttrWriteType.READ_WRITE)
RCU_band_select_R = attribute_wrapper(comms_annotation=["RCU_band_select_R" ],datatype=numpy.int64 , dims=(3,32))
RCU_band_select_RW = attribute_wrapper(comms_annotation=["RCU_band_select_RW" ],datatype=numpy.int64 , dims=(3,32), access=AttrWriteType.READ_WRITE)
RCU_DTH_freq_R = attribute_wrapper(comms_annotation=["RCU_DTH_freq_R" ],datatype=numpy.int64 , dims=(3,32))
RCU_DTH_freq_RW = attribute_wrapper(comms_annotation=["RCU_DTH_freq_RW" ],datatype=numpy.int64 , dims=(3,32), access=AttrWriteType.READ_WRITE)
RCU_DTH_on_R = attribute_wrapper(comms_annotation=["RCU_DTH_on_R" ],datatype=bool , dims=(3,32))
RCU_LED_green_on_R = attribute_wrapper(comms_annotation=["RCU_LED_green_on_R" ],datatype=bool , dims=(32,))
RCU_LED_green_on_RW = attribute_wrapper(comms_annotation=["RCU_LED_green_on_RW" ],datatype=bool , dims=(32,), access=AttrWriteType.READ_WRITE)
RCU_LED_red_on_R = attribute_wrapper(comms_annotation=["RCU_LED_red_on_R" ],datatype=bool , dims=(32,))
RCU_LED_red_on_RW = attribute_wrapper(comms_annotation=["RCU_LED_red_on_RW" ],datatype=bool , dims=(32,), access=AttrWriteType.READ_WRITE)
RCU_mask_RW = attribute_wrapper(comms_annotation=["RCU_mask_RW" ],datatype=bool , dims=(32,), access=AttrWriteType.READ_WRITE)
RCU_PCB_ID_R = attribute_wrapper(comms_annotation=["RCU_PCB_ID_R" ],datatype=numpy.int64 , dims=(32,))
RCU_PCB_number_R = attribute_wrapper(comms_annotation=["RCU_PCB_number_R" ],datatype=str , dims=(32,))
RCU_PCB_version_R = attribute_wrapper(comms_annotation=["RCU_PCB_version_R" ],datatype=str , dims=(32,))
RCU_PWR_1V8_R = attribute_wrapper(comms_annotation=["RCU_PWR_1V8_R" ],datatype=numpy.float64, dims=(32,))
RCU_PWR_2V5_R = attribute_wrapper(comms_annotation=["RCU_PWR_2V5_R" ],datatype=numpy.float64, dims=(32,))
RCU_PWR_3V3_R = attribute_wrapper(comms_annotation=["RCU_PWR_3V3_R" ],datatype=numpy.float64, dims=(32,))
RCU_PWR_ANALOG_on_R = attribute_wrapper(comms_annotation=["RCU_PWR_ANALOG_on_R" ],datatype=bool , dims=(32,))
RCU_PWR_ANT_IOUT_R = attribute_wrapper(comms_annotation=["RCU_PWR_ANT_IOUT_R" ],datatype=numpy.float64, dims=(3,32))
RCU_PWR_ANT_on_R = attribute_wrapper(comms_annotation=["RCU_PWR_ANT_on_R" ],datatype=bool , dims=(3,32))
RCU_PWR_ANT_on_RW = attribute_wrapper(comms_annotation=["RCU_PWR_ANT_on_RW" ],datatype=bool , dims=(3,32), access=AttrWriteType.READ_WRITE)
RCU_PWR_ANT_VIN_R = attribute_wrapper(comms_annotation=["RCU_PWR_ANT_VIN_R" ],datatype=numpy.float64, dims=(3,32))
RCU_PWR_ANT_VOUT_R = attribute_wrapper(comms_annotation=["RCU_PWR_ANT_VOUT_R" ],datatype=numpy.float64, dims=(3,32))
RCU_PWR_DIGITAL_on_R = attribute_wrapper(comms_annotation=["RCU_PWR_DIGITAL_on_R" ],datatype=bool , dims=(32,))
RCU_PWR_good_R = attribute_wrapper(comms_annotation=["RCU_PWR_good_R" ],datatype=bool , dims=(32,))
RCU_TEMP_R = attribute_wrapper(comms_annotation=["RCU_TEMP_R" ],datatype=numpy.float64, dims=(32,))
RECVTR_I2C_error_R = attribute_wrapper(comms_annotation=["RECVTR_I2C_error_R" ],datatype=numpy.int64 , dims=(32,))
RECVTR_monitor_rate_RW = attribute_wrapper(comms_annotation=["RECVTR_monitor_rate_RW" ],datatype=numpy.int64 , access=AttrWriteType.READ_WRITE)
RECVTR_translator_busy_R = attribute_wrapper(comms_annotation=["RECVTR_translator_busy_R" ],datatype=bool)
# ----------
# Summarising Attributes
# ----------
RCU_LED_colour_R = attribute(dtype=(numpy.uint32,), max_dim_x=32, fisallowed="is_attribute_access_allowed")
def read_RCU_LED_colour_R(self):
return (2 * self.read_attribute("RCU_LED_green_on_R") + 4 * self.read_attribute("RCU_LED_red_on_R")).astype(numpy.uint32)
RCU_error_R = attribute(dtype=(bool,), max_dim_x=32, fisallowed="is_attribute_access_allowed")
ANT_error_R = attribute(dtype=((bool,),), max_dim_x=3, max_dim_y=32, fisallowed="is_attribute_access_allowed")
def read_RCU_error_R(self):
return self.read_attribute("RCU_mask_RW") & (
(self.read_attribute("RECVTR_I2C_error_R") > 0)
| self.alarm_val("RCU_PCB_ID_R")
)
def read_ANT_error_R(self):
return self.read_attribute("ANT_mask_RW") & (
~self.read_attribute("RCU_ADC_locked_R")
)
RECV_IOUT_error_R = attribute(dtype=(bool,), max_dim_x=32)
RECV_TEMP_error_R = attribute(dtype=(bool,), max_dim_x=32, polling_period=1000)
RECV_VOUT_error_R = attribute(dtype=(bool,), max_dim_x=32)
def read_RECV_IOUT_error_R(self):
return (self.read_attribute("ANT_mask_RW") & (
self.alarm_val("RCU_PWR_ANT_IOUT_R")
)).any(axis=1)
def read_RECV_TEMP_error_R(self):
# Don't apply the mask here --- we always want to know if things get too hot!
return (
self.alarm_val("RCU_TEMP_R")
)
def read_RECV_VOUT_error_R(self):
return (self.read_attribute("ANT_mask_RW") & (
self.alarm_val("RCU_PWR_ANT_VIN_R")
| self.alarm_val("RCU_PWR_ANT_VOUT_R")
)).any(axis=1) | (self.read_attribute("RCU_mask_RW") & (
self.alarm_val("RCU_PWR_1V8_R")
| self.alarm_val("RCU_PWR_2V5_R")
| self.alarm_val("RCU_PWR_3V3_R")
| ~self.read_attribute("RCU_PWR_DIGITAL_on_R")
| ~self.read_attribute("RCU_PWR_good_R")
))
# --------
# overloaded functions
# --------
def properties_changed(self):
super().properties_changed()
# The HBAT can only apply positive delays, yet we want to apply a delay
# relative to the center of the tile, which typically requires negative
# delays for half of the elements.
#
# We circumvent this issue by increasing the delays for all elements
# by a fixed amount, the average of all steps. Doing so should result
# in positive delays regardless of the pointing direction.
self.HBAT_bf_delay_offset = numpy.mean(self.HBAT_bf_delay_step_delays)
def _initialise_hardware(self):
""" Initialise the RCU hardware. """
# Cycle RCUs
self.RCU_off()
self.wait_attribute("RECVTR_translator_busy_R", False, self.RCU_On_Off_timeout)
self.RCU_on()
self.wait_attribute("RECVTR_translator_busy_R", False, self.RCU_On_Off_timeout)
def _disable_hardware(self):
""" Disable the RECV hardware. """
# Save actual mask values
RCU_mask = self.proxy.RCU_mask_RW
# Set the mask to all Trues
self.RCU_mask_RW = [True] * 32
# Turn off the RCUs
self.RCU_off()
self.wait_attribute("RECVTR_translator_busy_R", False, self.RCU_On_Off_timeout)
# Restore the mask
self.RCU_mask_RW = RCU_mask
# --------
# internal functions
# --------
def _calculate_HBAT_bf_delay_steps(self, delays: numpy.ndarray):
"""
Helper function that converts a signal path delay (in seconds) to an analog beam weight,
which is a value per tile per dipole per polarisation.
"""
# Duplicate delay values per polarisation
polarised_delays = numpy.tile(delays, 2) # output dims -> 96x32
# Add signal input delay
calibrated_delays = numpy.add(polarised_delays, self.HBAT_signal_input_delays)
# Find the right delay step by looking for the closest match in property RECV-> HBAT_bf_delay_step_delays
def nearest_delay_step(delay):
# We want the index in the HBAT_bf_delay_steps_delay array which is closest to the given delay,
# shifted by HBAT_bf_delay_offset to obtain strictly positive delays.
return (numpy.abs(self.HBAT_bf_delay_step_delays - (delay + self.HBAT_bf_delay_offset))).argmin()
# Apply to all elements to convert each delay into the number of delay steps
return numpy.vectorize(nearest_delay_step)(calibrated_delays)
# --------
# Commands
# --------
@command(dtype_in=DevVarFloatArray, dtype_out=DevVarFloatArray)
def calculate_HBAT_bf_delay_steps(self, delays: numpy.ndarray):
""" converts a signal path delay (in seconds) to an analog beam weight """
# Reshape the flatten input array, into whatever how many tiles we get
delays = numpy.array(delays).reshape(-1,16)
# Calculate the beam weight array
HBAT_bf_delay_steps = self._calculate_HBAT_bf_delay_steps(delays)
return HBAT_bf_delay_steps.flatten()
@command(dtype_in=DevString, dtype_out=DevLong)
def get_rcu_band_from_filter(self, filter_name: str):
""" return the rcu band given the filter name"""
return self.FILTER_RCU_DICT.get(filter_name, -1)
@command()
@DebugIt()
@only_in_states(lofar_device.DEFAULT_COMMAND_STATES)
def RCU_off(self):
"""
:return:None
"""
self.opcua_connection.call_method(["RCU_off"])
@command()
@DebugIt()
@only_in_states(lofar_device.DEFAULT_COMMAND_STATES)
def RCU_on(self):
"""
:return:None
"""
self.opcua_connection.call_method(["RCU_on"])
@command()
@DebugIt()
@only_in_states(lofar_device.DEFAULT_COMMAND_STATES)
def RCU_DTH_off(self):
"""
:return:None
"""
self.opcua_connection.call_method(["RCU_DTH_off"])
@command()
@DebugIt()
@only_in_states(lofar_device.DEFAULT_COMMAND_STATES)
def RCU_DTH_on(self):
"""
:return:None
"""
self.opcua_connection.call_method(["RCU_DTH_on"])
# ----------
# Run server
# ----------
def main(**kwargs):
"""Main function of the RECV module."""
return entry(RECV, **kwargs)