Skip to content
Snippets Groups Projects
Commit d1cbf0b6 authored by Stefano Di Frischia's avatar Stefano Di Frischia
Browse files

Merge branch 'L2SS-633-fill-in-bst-device' into 'master'

Resolve L2SS-633 "Fill in bst device"

Closes L2SS-633

See merge request !347
parents 2d44dd8d 04d511b2
No related branches found
No related tags found
1 merge request!347Resolve L2SS-633 "Fill in bst device"
...@@ -11,16 +11,102 @@ ...@@ -11,16 +11,102 @@
""" """
from tango.server import device_property, attribute
from tango import AttrWriteType
# Own imports # Own imports
from tangostationcontrol.common.entrypoint import entry from tangostationcontrol.common.entrypoint import entry
from tangostationcontrol.devices.lofar_device import lofar_device from tangostationcontrol.clients.attribute_wrapper import attribute_wrapper
from tangostationcontrol.clients.opcua_client import OPCUAConnection
from tangostationcontrol.clients.statistics_client import StatisticsClient
from tangostationcontrol.devices.sdp.statistics import Statistics
from tangostationcontrol.devices.sdp.statistics_collector import BSTCollector from tangostationcontrol.devices.sdp.statistics_collector import BSTCollector
import numpy
__all__ = ["BST", "main"] __all__ = ["BST", "main"]
class BST(lofar_device): class BST(Statistics):
STATISTICS_COLLECTOR_CLASS = BSTCollector STATISTICS_COLLECTOR_CLASS = BSTCollector
# -----------------
# Device Properties
# -----------------
FPGA_bst_offload_hdr_eth_destination_mac_RW_default = device_property(
dtype='DevVarStringArray',
mandatory=True
)
FPGA_bst_offload_hdr_ip_destination_address_RW_default = device_property(
dtype='DevVarStringArray',
mandatory=True
)
FPGA_bst_offload_hdr_udp_destination_port_RW_default = device_property(
dtype='DevVarUShortArray',
mandatory=True
)
FPGA_bst_offload_enable_RW_default = device_property(
dtype='DevVarBooleanArray',
mandatory=False,
default_value=[True] * 16
)
first_default_settings = [
'FPGA_bst_offload_hdr_eth_destination_mac_RW',
'FPGA_bst_offload_hdr_ip_destination_address_RW',
'FPGA_bst_offload_hdr_udp_destination_port_RW',
# enable only after the offloading is configured correctly
'FPGA_bst_offload_enable_RW'
]
# ----------
# Attributes
# ----------
# FPGA control points for BSTs
FPGA_bst_offload_enable_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_bst_offload_enable_RW"], datatype=numpy.bool_, dims=(16,), access=AttrWriteType.READ_WRITE)
FPGA_bst_offload_enable_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_bst_offload_enable_R"], datatype=numpy.bool_, dims=(16,))
FPGA_bst_offload_hdr_eth_destination_mac_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_bst_offload_hdr_eth_destination_mac_RW"], datatype=numpy.str, dims=(16,), access=AttrWriteType.READ_WRITE)
FPGA_bst_offload_hdr_eth_destination_mac_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_bst_offload_hdr_eth_destination_mac_R"], datatype=numpy.str, dims=(16,))
FPGA_bst_offload_hdr_ip_destination_address_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_bst_offload_hdr_ip_destination_address_RW"], datatype=numpy.str, dims=(16,), access=AttrWriteType.READ_WRITE)
FPGA_bst_offload_hdr_ip_destination_address_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_bst_offload_hdr_ip_destination_address_R"], datatype=numpy.str, dims=(16,))
FPGA_bst_offload_hdr_udp_destination_port_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_bst_offload_hdr_udp_destination_port_RW"], datatype=numpy.uint16, dims=(16,), access=AttrWriteType.READ_WRITE)
FPGA_bst_offload_hdr_udp_destination_port_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_bst_offload_hdr_udp_destination_port_R"], datatype=numpy.uint16, dims=(16,))
FPGA_bst_offload_bsn_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["FPGA_bst_offload_bsn_R"], datatype=numpy.int64, dims=(16,))
FPGA_bst_offload_nof_packets_R = attribute_wrapper(comms_annotation=["FPGA_bst_offload_nof_packets_R"], datatype=numpy.int32, dims=(16,))
FPGA_bst_offload_nof_valid_R = attribute_wrapper(comms_annotation=["FPGA_bst_offload_nof_valid_R"], datatype=numpy.int32, dims=(16,))
# number of packets with valid payloads
nof_valid_payloads_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_valid_payloads"}, dims=(BSTCollector.MAX_FPGAS,), datatype=numpy.uint64)
# number of packets with invalid payloads
nof_payload_errors_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_payload_errors"}, dims=(BSTCollector.MAX_FPGAS,), datatype=numpy.uint64)
# latest BSTs
bst_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "bst_values"}, dims=(BSTCollector.MAX_BEAMLETS, BSTCollector.MAX_BLOCKS), datatype=numpy.uint64)
# reported timestamp
# for each row in the latest BSTs
bst_timestamp_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "bst_timestamps"}, dims=(BSTCollector.MAX_BLOCKS,), datatype=numpy.uint64)
# ----------
# Summarising Attributes
# ----------
FPGA_processing_error_R = attribute(dtype=(bool,), max_dim_x=16)
def read_FPGA_processing_error_R(self):
return self.sdp_proxy.TR_fpga_mask_RW & (
~self.read_attribute("FPGA_bst_offload_enable_R")
)
# --------
# Overloaded functions
# --------
# --------
# Commands
# --------
# ---------- # ----------
# Run server # Run server
......
...@@ -8,9 +8,24 @@ ...@@ -8,9 +8,24 @@
# Distributed under the terms of the APACHE license. # Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info. # See LICENSE.txt for more info.
from .base import AbstractTestBases from .base import AbstractTestBases
from tangostationcontrol.integration_test.device_proxy import TestDeviceProxy
class TestDeviceBST(AbstractTestBases.TestDeviceBase): class TestDeviceBST(AbstractTestBases.TestDeviceBase):
def setUp(self): def setUp(self):
"""Intentionally recreate the device object in each test""" """Intentionally recreate the device object in each test"""
super().setUp("STAT/BST/1") super().setUp("STAT/BST/1")
def test_device_read_all_attributes(self):
# We need to connect to SDP first to read some of our attributes
self.sdp_proxy = self.setup_sdp()
super().test_device_read_all_attributes()
def setup_sdp(self):
# setup SDP, on which this device depends
sdp_proxy = TestDeviceProxy("STAT/SDP/1")
sdp_proxy.off()
sdp_proxy.warm_boot()
sdp_proxy.set_defaults()
return sdp_proxy
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment