Skip to content
Snippets Groups Projects
Commit a4c0fa26 authored by Jan David Mol's avatar Jan David Mol
Browse files

L2SS-276: Initial implementation of XST device, made payload errors refer to...

L2SS-276: Initial implementation of XST device, made payload errors refer to FPGA they're from instead of which input they're for, and added subbands_calibrated flag to exposed SST parameters
parent 72520dc8
Branches
Tags
1 merge request!113L2SS-276: Create XST device
......@@ -741,6 +741,28 @@
}
}
},
"XST": {
"LTS": {
"XST": {
"LTS/XST/1": {
"properties": {
"Statistics_Client_Port": [
"5002"
],
"OPC_Server_Name": [
"dop36.astron.nl"
],
"OPC_Server_Port": [
"4840"
],
"OPC_Time_Out": [
"5.0"
]
}
}
}
}
},
"StatsCrosslet": {
"CS997": {
"StatsCrosslet": {
......
......@@ -65,15 +65,17 @@ class SST(Statistics):
FPGA_sst_offload_selector_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_sst_offload_selector_R"], datatype=numpy.bool_, dims=(16,))
# number of packets with valid payloads
nof_valid_payloads_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_valid_payloads"}, dims=(SSTCollector.MAX_INPUTS,), datatype=numpy.uint64)
nof_valid_payloads_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_valid_payloads"}, dims=(SSTCollector.MAX_FPGAS,), datatype=numpy.uint64)
# number of packets with invalid payloads
nof_payload_errors_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_payload_errors"}, dims=(SSTCollector.MAX_INPUTS,), datatype=numpy.uint64)
nof_payload_errors_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_payload_errors"}, dims=(SSTCollector.MAX_FPGAS,), datatype=numpy.uint64)
# latest SSTs
sst_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "sst_values"}, dims=(SSTCollector.MAX_SUBBANDS, SSTCollector.MAX_INPUTS), datatype=numpy.uint64)
# reported timestamp for each row in the latest SSTs
sst_timestamp_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "sst_timestamps"}, dims=(SSTCollector.MAX_INPUTS,), datatype=numpy.uint64)
# integration interval for each row in the latest SSTs
integration_interval_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "integration_intervals"}, dims=(SSTCollector.MAX_INPUTS,), datatype=numpy.float32)
# whether the subband data was calibrated by the SDP (that is, were subband weights applied)
subbands_calibrated_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "subbands_calibrated"}, dims=(SSTCollector.MAX_INPUTS,), datatype=numpy.bool_)
# --------
# Overloaded functions
......
......@@ -10,15 +10,12 @@ logger = logging.getLogger()
class StatisticsCollector(Thread):
""" Base class to process statistics packets from a queue, asynchronously. """
# Maximum number of antenna inputs we support (used to determine array sizes)
MAX_INPUTS = 192
# Maximum number of subbands we support (used to determine array sizes)
MAX_SUBBANDS = 512
# Maximum time to wait for the Thread to get unstuck, if we want to stop
DISCONNECT_TIMEOUT = 10.0
# Maximum number of FPGAs we receive data from (used for diagnostics)
MAX_FPGAS = 16
def __init__(self, queue: Queue):
self.queue = queue
self.last_packet = None
......@@ -100,15 +97,16 @@ class SSTCollector(StatisticsCollector):
defaults.update({
# Number of packets received so far that we could parse correctly and do not have a payload error
"nof_valid_payloads": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.uint64),
"nof_valid_payloads": numpy.zeros((self.MAX_FPGAS,), dtype=numpy.uint64),
# Packets that reported a payload error
"nof_payload_errors": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.uint64),
"nof_payload_errors": numpy.zeros((self.MAX_FPGAS,), dtype=numpy.uint64),
# Last value array we've constructed out of the packets
"sst_values": numpy.zeros((self.MAX_INPUTS, self.MAX_SUBBANDS), dtype=numpy.uint64),
"sst_timestamps": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.float64),
"integration_intervals": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.float32),
"subbands_calibrated": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.bool_),
})
return defaults
......@@ -125,13 +123,105 @@ class SSTCollector(StatisticsCollector):
if fields.payload_error:
# cannot trust the data if a payload error is reported
self.parameters["nof_payload_errors"][input_index] += numpy.uint64(1)
self.parameters["nof_payload_errors"][fields.gn_index] += numpy.uint64(1)
# don't raise, as packet is valid
return
# process the packet
self.parameters["nof_valid_payloads"][input_index] += numpy.uint64(1)
self.parameters["nof_valid_payloads"][fields.gn_index] += numpy.uint64(1)
self.parameters["sst_values"][input_index][:fields.nof_statistics_per_packet] = fields.payload
self.parameters["sst_timestamps"][input_index] = numpy.float64(fields.timestamp().timestamp())
self.parameters["integration_intervals"][input_index] = fields.integration_interval()
self.parameters["subbands_calibrated"][input_index] = fields.subband_calibrated_flagt
def nr_baselines(nr_inputs: int) -> int:
return nr_inputs * (nr_inputs + 1) // 2
def baseline_index(major: int, minor: int) -> int:
if major < minor:
raise ValueError(f"major < minor: {major} < {minor}. Since we do not store the conjugates this will lead to processing errors.")
return major * (major + 1) // 2 + minor
class XSTCollector(StatisticsCollector):
""" Class to process XST statistics packets. """
# Maximum number of antenna inputs we support (used to determine array sizes)
MAX_INPUTS = 192
# Maximum number of baselines we can receive
MAX_BASELINES = nr_baselines(MAX_INPUTS)
# Expected block size is BLOCK_LENGTH x BLOCK_LENGTH
BLOCK_LENGTH = 12
# Expected number of blocks: enough to cover all baselines without the conjugates (that is, the top-left triangle of the matrix).
MAX_BLOCKS = nr_baselines(MAX_INPUTS // BLOCK_LENGTH)
# Maximum number of subbands we support (used to determine array sizes)
MAX_SUBBANDS = 512
# Complex values are (real, imag). A bit silly, but we don't want magical constants.
VALUES_PER_COMPLEX = 2
def _default_parameters(self):
defaults = super()._default_parameters()
defaults.update({
# Number of packets received so far that we could parse correctly and do not have a payload error
"nof_valid_payloads": numpy.zeros((self.MAX_FPGAS,), dtype=numpy.uint64),
# Packets that reported a payload error
"nof_payload_errors": numpy.zeros((self.MAX_FPGAS,), dtype=numpy.uint64),
# Last value array we've constructed out of the packets
"xst_values": numpy.zeros((self.MAX_BLOCKS, self.BLOCK_LENGTH * self.BLOCK_LENGTH * self.VALUES_PER_COMPLEX), dtype=numpy.float64),
"xst_timestamps": numpy.zeros((self.MAX_BLOCKS,), dtype=numpy.float64),
"xst_subbands": numpy.zeros((self.MAX_BLOCKS,), dtype=numpy.uint16),
"integration_intervals": numpy.zeros((self.MAX_BLOCKS,), dtype=numpy.float32),
})
return defaults
def process_packet(self, packet):
fields = XSTPacket(packet)
if fields.payload_error:
# cannot trust the data if a payload error is reported
self.parameters["nof_payload_errors"][fields.gn_index] += numpy.uint64(1)
# don't raise, as packet is valid
return
# the blocks must be of size BLOCK_LENGTH x BLOCK_LENGTH
if fields.nof_signal_inputs != self.BLOCK_LENGTH:
raise ValueError("Packet describes a block of {0} x {0} baselines, but we can only parse blocks of {1} x {1} baselines".format(fields.nof_signal_inputs, self.BLOCK_LENGTH))
# check whether set of baselines in this packet are not out of bounds
for antenna in (0,1):
if fields.first_baseline[antenna] + nof_signal_inputs >= self.MAX_INPUTS:
# packet describes an input that is out of bounds for us
raise ValueError("Packet describes {0} x {0} baselines starting at {1}, but we are limited to describing MAX_INPUTS={2}".format(fields.nof_signal_inputs, fields.first_baseline, self.MAX_INPUTS))
# the blocks of baselines need to be tightly packed, and thus be provided at exact intervals
if fields.first_baseline[antenna] % self.BLOCK_LENGTH != 0:
raise ValueError("Packet describes baselines starting at %s, but we require a multiple of BLOCK_LENGTH=%d" % (fields.first_baseline, self.MAX_INPUTS))
# the payload contains complex values for the block of baselines of size BLOCK_LENGTH x BLOCK_LENGTH
# starting at baseline first_baseline.
#
# we honour this format, as we want to keep the metadata together with these blocks. we do need to put the blocks in a linear
# and tight order, however, so we calculate a block index.
block_index = baseline_index(fields.first_baseline[0] // self.BLOCK_LENGTH, fields.first_baseline[1] // self.BLOCK_LENGTH)
# process the packet
self.parameters["nof_valid_payloads"][fields.gn_index] += numpy.uint64(1)
block_index = baseline_index(fields.first_baseline[0], fields.first_baseline[1])
self.parameters["xst_values"][block_index][:fields.nof_statistics_per_packet] = fields.payload
self.parameters["xst_timestamps"][block_index] = numpy.float64(fields.timestamp().timestamp())
self.parameters["xst_subbands"][block_index] = numpy.uint16(fields.subband_index)
self.parameters["integration_intervals"][block_index] = fields.integration_interval()
......@@ -117,9 +117,9 @@ class StatisticsPacket(object):
self.nyquist_zone_index = get_bit_value(self.source_info, 13, 14)
self.t_adc = get_bit_value(self.source_info, 12)
self.fsub_type = get_bit_value(self.source_info, 11)
self.payload_error = get_bit_value(self.source_info, 10)
self.beam_repositioning_flag = get_bit_value(self.source_info, 9)
self.subband_calibrated_flag = get_bit_value(self.source_info, 8)
self.payload_error = (get_bit_value(self.source_info, 10) != 0)
self.beam_repositioning_flag = (get_bit_value(self.source_info, 9) != 0)
self.subband_calibrated_flag = (get_bit_value(self.source_info, 8) != 0)
# self.source_info 5-7 are reserved
self.gn_index = get_bit_value(self.source_info, 0, 4)
......@@ -263,9 +263,10 @@ class XSTPacket(StatisticsPacket):
The following fields are exposed as properties & functions.
subband_index: subband number for which this packet contains statistics.
baseline: antenna pair for which this packet contains statistics.
first_baseline: first antenna pair for which this packet contains statistics.
payload[nof_signal_inputs][nof_signal_inputs] the baselines, starting from first_baseline
"""
def __init__(self, packet):
......@@ -281,13 +282,13 @@ class XSTPacket(StatisticsPacket):
super().unpack_data_id()
self.subband_index = get_bit_value(self.data_id, 16, 24)
self.baseline = (get_bit_value(self.data_id, 8, 15), get_bit_value(self.data_id, 0, 7))
self.first_baseline = (get_bit_value(self.data_id, 8, 15), get_bit_value(self.data_id, 0, 7))
def header(self):
header = super().header()
header["data_id"]["subband_index"] = self.subband_index
header["data_id"]["baseline"] = self.baseline
header["data_id"]["first_baseline"] = self.first_baseline
return header
......
# -*- coding: utf-8 -*-
#
# This file is part of the XST project
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
""" XST Device Server for LOFAR2.0
"""
# TODO(Corne): Remove sys.path.append hack once packaging is in place!
import os, sys
currentdir = os.path.dirname(os.path.realpath(__file__))
parentdir = os.path.dirname(currentdir)
parentdir = os.path.dirname(parentdir)
sys.path.append(parentdir)
# PyTango imports
from tango.server import run
from tango.server import device_property, attribute
from tango import AttrWriteType
# Additional import
from clients.attribute_wrapper import attribute_wrapper
from clients.opcua_client import OPCUAConnection
from clients.statistics_client import StatisticsClient
from devices.hardware_device import hardware_device
from common.lofar_git import get_version
from common.lofar_logging import device_logging_to_python, log_exceptions
from devices.sdp.statistics import Statistics
from devices.sdp.statistics_collector import XSTCollector
import numpy
__all__ = ["XST", "main"]
class XST(Statistics):
STATISTICS_COLLECTOR_CLASS = XSTCollector
# -----------------
# Device Properties
# -----------------
# ----------
# Attributes
# ----------
# FPGA control points for XSTs
FPGA_xst_integration_interval_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_xst_integration_interval_RW"], datatype=numpy.double, dims=(8,16), access=AttrWriteType.READ_WRITE)
FPGA_xst_integration_interval_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_xst_integration_interval_R"], datatype=numpy.double, dims=(8,16))
FPGA_xst_offload_enable_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_xst_offload_enable_RW"], datatype=numpy.bool_, dims=(16,), access=AttrWriteType.READ_WRITE)
FPGA_xst_offload_enable_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_xst_offload_enable_R"], datatype=numpy.bool_, dims=(16,))
FPGA_xst_offload_hdr_eth_destination_mac_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_xst_offload_hdr_eth_destination_mac_RW"], datatype=numpy.str_, dims=(16,), access=AttrWriteType.READ_WRITE)
FPGA_xst_offload_hdr_eth_destination_mac_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_xst_offload_hdr_eth_destination_mac_R"], datatype=numpy.str_, dims=(16,))
FPGA_xst_offload_hdr_ip_destination_address_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_xst_offload_hdr_ip_destination_address_RW"], datatype=numpy.str_, dims=(16,), access=AttrWriteType.READ_WRITE)
FPGA_xst_offload_hdr_ip_destination_address_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_xst_offload_hdr_ip_destination_address_R"], datatype=numpy.str_, dims=(16,))
FPGA_xst_offload_hdr_udp_destination_port_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_xst_offload_hdr_udp_destination_port_RW"], datatype=numpy.uint16, dims=(16,), access=AttrWriteType.READ_WRITE)
FPGA_xst_offload_hdr_udp_destination_port_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_xst_offload_hdr_udp_destination_port_R"], datatype=numpy.uint16, dims=(16,))
FPGA_xst_offload_subband_select_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_xst_offload_subband_select_RW"], datatype=numpy.uint32, dims=(16,), access=AttrWriteType.READ_WRITE)
FPGA_xst_offload_subband_select_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_xst_offload_subband_select_R"], datatype=numpy.uint32, dims=(16,))
# number of packets with valid payloads
nof_valid_payloads_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_valid_payloads"}, dims=(XSTCollector.MAX_FPGAS,), datatype=numpy.uint64)
# number of packets with invalid payloads
nof_payload_errors_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_payload_errors"}, dims=(XSTCollector.MAX_FPGAS,), datatype=numpy.uint64)
# latest XSTs
xst_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "xst_values"}, dims=(XSTCollector.BLOCK_LENGTH * XSTCollector.BLOCK_LENGTH * XSTCollector.VALUES_PER_COMPLEX, XSTCollector.MAX_BLOCKS), datatype=numpy.float64)
# reported timestamp for each row in the latest XSTs
xst_timestamp_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "xst_timestamps"}, dims=(XSTCollector.MAX_BLOCKS,), datatype=numpy.uint64)
# which subband the XSTs describe
xst_subbands_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "xst_subbands"}, dims=(XSTCollector.MAX_BLOCKS,), datatype=numpy.uint16)
# integration interval for each row in the latest XSTs
integration_interval_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "integration_intervals"}, dims=(XSTCollector.MAX_BLOCKS,), datatype=numpy.float32)
# --------
# Overloaded functions
# --------
# --------
# Commands
# --------
# ----------
# Run server
# ----------
def main(args=None, **kwargs):
"""Main function of the XST Device module."""
from common.lofar_logging import configure_logger
configure_logger()
return run((XST,), args=args, **kwargs)
if __name__ == '__main__':
main()
#
# Docker compose file that launches an interactive iTango session.
#
# Connect to the interactive session with 'docker attach itango'.
# Disconnect with the Docker deattach sequence: <CTRL>+<P> <CTRL>+<Q>
#
# Defines:
# - itango: iTango interactive session
#
# Requires:
# - lofar-device-base.yml
#
version: '2'
services:
device-sst:
image: device-xst
# build explicitly, as docker-compose does not understand a local image
# being shared among services.
build:
context: lofar-device-base
args:
SOURCE_IMAGE: ${DOCKER_REGISTRY_HOST}/${DOCKER_REGISTRY_USER}-tango-itango:${TANGO_ITANGO_VERSION}
container_name: ${CONTAINER_NAME_PREFIX}device-xst
networks:
- control
- data
ports:
- "5002:5002/udp" # port to receive XST UDP packets on
- "5704:5704" # unique port for this DS
volumes:
- ${TANGO_LOFAR_CONTAINER_MOUNT}
environment:
- TANGO_HOST=${TANGO_HOST}
entrypoint:
- /usr/local/bin/wait-for-it.sh
- ${TANGO_HOST}
- --timeout=30
- --strict
- --
# configure CORBA to _listen_ on 0:port, but tell others we're _reachable_ through ${HOSTNAME}:port, since CORBA
# can't know about our Docker port forwarding
- python3 -u ${TANGO_LOFAR_CONTAINER_DIR}/devices/devices/sdp/xst.py LTS -v -ORBendPoint giop:tcp:0:5704 -ORBendPointPublish giop:tcp:${HOSTNAME}:5704
restart: on-failure
......@@ -2,6 +2,7 @@
pcc = DeviceProxy("LTS/PCC/1")
sdp = DeviceProxy("LTS/SDP/1")
sst = DeviceProxy("LTS/SST/1")
xst = DeviceProxy("LTS/XST/1")
# Put them in a list in case one wants to iterate
devices = [pcc, sdp, sst]
devices = [pcc, sdp, sst, xst]
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment