Select Git revision
-
Eric Kooistra authored
SVN copied data and hex directories so that they can be referred to from $RADIOHDL instead of from $UNB. SVN copied python, doc, matlab sub directories in tb or src to $RADIOHDL as well. Only common/hdllib.cfg still refers to $UNB because some low level components (multipliers) are not technology independent yet.
Eric Kooistra authoredSVN copied data and hex directories so that they can be referred to from $RADIOHDL instead of from $UNB. SVN copied python, doc, matlab sub directories in tb or src to $RADIOHDL as well. Only common/hdllib.cfg still refers to $UNB because some low level components (multipliers) are not technology independent yet.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
collector.py 21.14 KiB
# Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
"""Collectors for all types of statistics
This includes:
XSTCollector
BSTCollector
SSTCollector
"""
# logging use & lazy string, (too-few-public-methods)
# pylint: disable=W1203, R0903
import abc
import logging
import numpy
from lofar_station_client.math.baseline import baseline_from_index
from lofar_station_client.math.baseline import baseline_index
# TODO(Corne): Discuss moving to lofar_common_python library?
from lofar_station_client.math.baseline import nr_baselines
from lofar_station_client.statistics.packet import BSTPacket
from lofar_station_client.statistics.packet import SSTPacket
from lofar_station_client.statistics.packet import XSTPacket
from lofar_station_client.dts.constants import N_pol
logger = logging.getLogger()
class StatisticsCollector(abc.ABC):
"""Base class to process statistics packets into parameters matrices."""
# Maximum number of FPGAs we receive data from (used for diagnostics)
MAX_FPGAS = 16
def __init__(self):
self.parameters = self._default_parameters()
def _default_parameters(self):
"""Default parameters that all types of statistics contain"""
return {
"nof_packets": numpy.uint64(0),
# Packet count for packets that could not be parsed
"nof_invalid_packets": numpy.uint64(0),
# Full contents of the latest packet we deemed invalid.
"last_invalid_packet": numpy.zeros((9000,), dtype=numpy.uint8),
# Track the last invalid packet
"last_invalid_packet_exception": "None",
# gn_index of FPGAs as reported in the packets, for verification purposes
"gn_indices": numpy.full((self.MAX_FPGAS,), -1, dtype=numpy.int16),
}
def gn_index_to_fpga_nr(self, gn_index: int) -> int:
"""Register an FPGA to the list of encountered gn_indices.
The FPGAs can carry any gn_index 0..255 but we only maintain
statistics for MAX_FPGAS of them. We therefor maintain a mapping
of which gn_indices we have encountered.
Returns the index of the registered gn_index within the list.
Throws a ValueError if more than MAX_FPGAS are encountered."""
def find_fpga_nr(gn_index):
indices = numpy.where(self.parameters["gn_indices"] == gn_index)[0]
if len(indices):
return indices[0]
raise ValueError(f"Could not find gn_index {gn_index}")
try:
return find_fpga_nr(gn_index)
except ValueError:
# FPGA not yet in the list
try:
new_fpga_nr = find_fpga_nr(-1)
except ValueError as ex:
# No room anymore! Throw exception
raise ValueError(
f"Cannot register data coming from FPGA {gn_index}, as "
f"the administration is filled with data from FPGAs "
f"{self.parameters['gn_indices']}"
) from ex
# Add this FPGA to the list
self.parameters["gn_indices"][new_fpga_nr] = gn_index
return new_fpga_nr
def process_packet(self, packet):
"""Baseclass wrapper around performing parse_packet"""
self.parameters["nof_packets"] += numpy.uint64(1)
try:
self._parse_packet(packet)
except Exception as err:
self.parameters["last_invalid_packet"] = numpy.frombuffer(
packet, dtype=numpy.uint8
)
self.parameters["nof_invalid_packets"] += numpy.uint64(1)
self.parameters["last_invalid_packet_exception"] = str(err)
raise ValueError("Could not parse statistics packet") from err
@abc.abstractmethod
def _parse_packet(self, packet):
"""Update any information based on this packet."""
raise NotImplementedError
class SSTCollector(StatisticsCollector):
"""Class to process SST statistics packets."""
# Maximum number of antenna inputs we support (used to determine array sizes)
MAX_INPUTS = 192
# Maximum number of subbands we support (used to determine array sizes)
MAX_SUBBANDS = 512
def __init__(
self, nr_signal_inputs: int = MAX_INPUTS, first_signal_input_index: int = 0
):
self.nr_signal_inputs = nr_signal_inputs
self.first_signal_input_index = first_signal_input_index
super().__init__()
def _default_parameters(self):
defaults = super()._default_parameters()
defaults.update(
{
# Number of packets received so far that we could parse correctly and do
# not have a payload error
"nof_valid_payloads": numpy.zeros(
(self.MAX_FPGAS,), dtype=numpy.uint64
),
# Packets that reported a payload error
"nof_payload_errors": numpy.zeros(
(self.MAX_FPGAS,), dtype=numpy.uint64
),
# Last value array we've constructed out of the packets
"sst_values": numpy.zeros(
(self.nr_signal_inputs, self.MAX_SUBBANDS), dtype=numpy.uint64
),
"sst_timestamps": numpy.zeros(
(self.nr_signal_inputs,), dtype=numpy.float64
),
"integration_intervals": numpy.zeros(
(self.nr_signal_inputs,), dtype=numpy.float32
),
"subbands_calibrated": numpy.zeros(
(self.nr_signal_inputs,), dtype=bool
),
}
)
return defaults
def _parse_packet(self, packet):
fields = SSTPacket(packet)
fpga_nr = self.gn_index_to_fpga_nr(fields.gn_index)
input_index = fields.signal_input_index - self.first_signal_input_index
# determine which input this packet contains data for
if not 0 <= input_index < self.nr_signal_inputs:
# packet describes an input that is out of bounds for us
raise ValueError(
f"Packet describes input {fields.signal_input_index}, but we are "
f"limited to describing {self.nr_signal_inputs} starting at index "
f"{self.first_signal_input_index}"
)
if fields.payload_error:
# cannot trust the data if a payload error is reported
self.parameters["nof_payload_errors"][fpga_nr] += numpy.uint64(1)
# don't raise, as packet is valid
return
# process the packet
self.parameters["nof_valid_payloads"][fpga_nr] += numpy.uint64(1)
self.parameters["sst_values"][input_index][
: fields.nof_statistics_per_packet
] = fields.payload()
self.parameters["sst_timestamps"][input_index] = numpy.float64(
fields.timestamp().timestamp()
)
self.parameters["integration_intervals"][
input_index
] = fields.integration_interval()
self.parameters["subbands_calibrated"][
input_index
] = fields.subband_calibrated_flag
class XSTCollector(StatisticsCollector):
"""Class to process XST statistics packets.
XSTs are received for up to MAX_PARALLEL_SUBBANDS simultaneously, and only the
values of the last MAX_PARALLEL_SUBBANDS are kept. Raw data are collected for
each subband in parameters["xst_blocks"], and overwritten if newer (younger)
data is received for the same subband. As such, the data represent a rolling
view on the XSTs.
The xst_values() function is a user-friendly way to read the xst_blocks.
The hardware can be configured to emit different and/or fewer subbands, causing
some of the XSTs to become stale. It is therefor advised to inspect
parameters["xst_timestamps"] as well.
"""
# Maximum number of subbands for which we collect XSTs simultaneously
MAX_PARALLEL_SUBBANDS = 8
# Maximum number of antenna inputs we support (used to determine array sizes)
MAX_INPUTS = 192
# Maximum number of baselines we can receive
MAX_BASELINES = nr_baselines(MAX_INPUTS)
# Expected block size is BLOCK_LENGTH x BLOCK_LENGTH
BLOCK_LENGTH = 12
# Expected number of blocks: enough to cover all baselines without the conjugates
# (that is, the top-left triangle of the matrix).
MAX_BLOCKS = nr_baselines(MAX_INPUTS // BLOCK_LENGTH)
# Maximum number of subbands we support (used to determine array sizes)
MAX_SUBBANDS = 512
# Complex values are (real, imag). A bit silly, but we don't want magical
# constants.
VALUES_PER_COMPLEX = 2
def __init__(
self,
nr_signal_inputs: int = MAX_INPUTS,
first_signal_input_index: int = 0,
nr_parallel_subbands: int = MAX_PARALLEL_SUBBANDS,
):
if nr_signal_inputs % self.BLOCK_LENGTH != 0:
# This restriction could be lifted if we slice sub blocks out
# of the XST packets.
raise ValueError(
f"Number of signal inputs must be a multiple of {self.BLOCK_LENGTH}, "
f"but is {nr_signal_inputs}"
)
self.nr_signal_inputs = nr_signal_inputs
self.first_signal_input_index = first_signal_input_index
self.nr_parallel_subbands = nr_parallel_subbands
super().__init__()
@property
def nr_blocks(self):
"""Number of blocks that contain XSTs for our signal inputs."""
return nr_baselines(self.nr_signal_inputs // self.BLOCK_LENGTH)
def _default_parameters(self):
defaults = super()._default_parameters()
defaults.update(
{
# Number of packets received so far that we could parse correctly
# and do not have a payload error
"nof_valid_payloads": numpy.zeros(
(self.MAX_FPGAS,), dtype=numpy.uint64
),
# Packets that reported a payload error
"nof_payload_errors": numpy.zeros(
(self.MAX_FPGAS,), dtype=numpy.uint64
),
# Last value array we've constructed out of the packets
"xst_blocks": numpy.zeros(
(
self.nr_parallel_subbands,
self.nr_blocks,
self.BLOCK_LENGTH * self.BLOCK_LENGTH * self.VALUES_PER_COMPLEX,
),
dtype=numpy.int64,
),
# Whether the values are actually conjugated and transposed
"xst_conjugated": numpy.zeros(
(
self.nr_parallel_subbands,
self.nr_blocks,
),
dtype=bool,
),
# When the youngest data for each subband was received
"xst_timestamps": numpy.zeros(
(self.nr_parallel_subbands,), dtype=numpy.float64
),
"xst_subbands": numpy.zeros(
(self.nr_parallel_subbands,), dtype=numpy.uint16
),
"xst_integration_intervals": numpy.zeros(
(self.nr_parallel_subbands,), dtype=numpy.float32
),
}
)
return defaults
def select_subband_slot(self, subband):
"""Return which subband slot (0..nr_parallel_subbands) to use
Selects which frontend to use if it is a new subband.
Keep recording the same subband if we're already tracking it, but
allocate or replace a slot if not.
"""
indices = numpy.where(self.parameters["xst_subbands"] == subband)[0]
if len(indices) > 0:
# subband already being recorded, use same spot
return indices[0]
# a new subband, kick out the oldest
oldest_timestamp = self.parameters["xst_timestamps"].min()
# prefer the first one in case of multiple minima
return numpy.where(self.parameters["xst_timestamps"] == oldest_timestamp)[0][0]
def _parse_packet(self, packet):
fields = XSTPacket(packet)
fpga_nr = self.gn_index_to_fpga_nr(fields.gn_index)
if fields.payload_error:
# cannot trust the data if a payload error is reported
self.parameters["nof_payload_errors"][fpga_nr] += numpy.uint64(1)
# don't raise, as packet is valid
return
# the blocks must be of size BLOCK_LENGTH x BLOCK_LENGTH
if fields.nof_signal_inputs != self.BLOCK_LENGTH:
raise ValueError(
f"Packet describes a block of {fields.nof_signal_inputs} x "
f"{fields.nof_signal_inputs} baselines, but we can only parse blocks"
f"of {self.BLOCK_LENGTH} x {self.BLOCK_LENGTH} baselines"
)
# check whether set of baselines in this packet are not out of bounds
for antenna in (0, 1):
if not (
0
<= fields.first_baseline[antenna]
+ fields.nof_signal_inputs
- self.first_signal_input_index
<= self.nr_signal_inputs
):
# packet describes an input that is out of bounds for us
raise ValueError(
f"Packet describes {fields.nof_signal_inputs} x "
f"{fields.nof_signal_inputs} baselines starting at "
f"{fields.first_baseline}, but we are limited to describing "
f"{self.nr_signal_inputs} starting at offset "
f"{self.first_signal_input_index}"
)
# the blocks of baselines need to be tightly packed, and thus be provided
# at exact intervals
if fields.first_baseline[antenna] % self.BLOCK_LENGTH != 0:
raise ValueError(
f"Packet describes baselines starting at {fields.first_baseline}, "
f"but we require a multiple of BLOCK_LENGTH={self.BLOCK_LENGTH}"
)
# Make sure we always have a baseline (a,b) with a>=b. If not, we swap the
# indices and mark that the data must be conjugated and transposed when
# processed.
first_baseline = fields.first_baseline
if first_baseline[0] < first_baseline[1]:
conjugated = True
first_baseline = (first_baseline[1], first_baseline[0])
else:
conjugated = False
# Adjust for our offset
first_baseline = (
first_baseline[0] - self.first_signal_input_index,
first_baseline[1] - self.first_signal_input_index,
)
# we keep track of multiple subbands. select slot for this one
subband_slot = self.select_subband_slot(fields.subband_index)
assert 0 <= subband_slot < self.nr_parallel_subbands, (
f"Selected slot{subband_slot}, but only have room for "
f"{self.nr_parallel_subbands}. Existing slots are "
f"{self.parameters['xst_subbands']}, processing subband "
f"{fields.subband_index}."
)
# the payload contains complex values for the block of baselines of size
# BLOCK_LENGTH x BLOCK_LENGTH starting at baseline first_baseline.
#
# we honour this format, as we want to keep the metadata together with these
# blocks. we do need to put the blocks in a linear and tight order, however,
# so we calculate a block index.
block_index = baseline_index(
first_baseline[0] // self.BLOCK_LENGTH,
first_baseline[1] // self.BLOCK_LENGTH,
)
# We did enough checks on first_baseline for this to be a logic error in our
# code
assert 0 <= block_index < self.nr_blocks, (
f"Received block {block_index}, but have only room for {self.nr_blocks}"
f". Block starts at baseline {first_baseline}."
)
# process the packet
self.parameters["nof_valid_payloads"][fpga_nr] += numpy.uint64(1)
self.parameters["xst_blocks"][
subband_slot, block_index, : fields.nof_statistics_per_packet
] = fields.payload()
self.parameters["xst_timestamps"][subband_slot] = numpy.float64(
fields.timestamp().timestamp()
)
self.parameters["xst_conjugated"][subband_slot, block_index] = conjugated
self.parameters["xst_subbands"][subband_slot] = numpy.uint16(
fields.subband_index
)
self.parameters["xst_integration_intervals"][
subband_slot
] = fields.integration_interval()
def xst_values(self, subband_indices=None):
"""xst_blocks, but as a matrix[len(subband_indices)][MAX_INPUTS][MAX_INPUTS]
The subband indices must be in [0..nr_parallel_subbands). By default, all
recorded XSTs are returned.
"""
if subband_indices is None:
subband_indices = range(self.nr_parallel_subbands)
matrix = numpy.zeros(
(len(subband_indices), self.nr_signal_inputs, self.nr_signal_inputs),
dtype=numpy.complex64,
)
xst_blocks = self.parameters["xst_blocks"]
xst_conjugated = self.parameters["xst_conjugated"]
for matrix_idx, subband_index in enumerate(subband_indices):
for block_index in range(self.nr_blocks):
# convert real/imag int to complex float values. this works as
# real/imag come in pairs
block = (
xst_blocks[subband_index][block_index]
.astype(numpy.float32)
.view(numpy.complex64)
)
# reshape into [a][b]
block = block.reshape(self.BLOCK_LENGTH, self.BLOCK_LENGTH)
if xst_conjugated[subband_index][block_index]:
# block is conjugated and transposed. process.
block = block.conjugate().transpose()
# compute destination in matrix
first_baseline = baseline_from_index(block_index)
first_baseline = (
first_baseline[0] * self.BLOCK_LENGTH,
first_baseline[1] * self.BLOCK_LENGTH,
)
# copy block into matrix
matrix[matrix_idx][
first_baseline[0] : first_baseline[0] + self.BLOCK_LENGTH,
first_baseline[1] : first_baseline[1] + self.BLOCK_LENGTH,
] = block
return matrix
class BSTCollector(StatisticsCollector):
"""Class to process SST statistics packets."""
# beamlets = 488 * 2 for the x and y polarisations
MAX_BEAMLETS = 488
def _default_parameters(self):
defaults = super()._default_parameters()
defaults.update(
{
# Number of packets received so far that we could parse correctly and
# do not have a payload error
"nof_valid_payloads": numpy.zeros(
(self.MAX_FPGAS,),
dtype=numpy.uint64,
),
# Packets that reported a payload error
"nof_payload_errors": numpy.zeros(
(self.MAX_FPGAS,),
dtype=numpy.uint64,
),
# Last value array we've constructed out of the packets
"bst_values": numpy.zeros(
(self.MAX_BEAMLETS, N_pol),
dtype=numpy.uint64,
),
"bst_timestamps": numpy.zeros(
(self.MAX_FPGAS,),
dtype=numpy.float64,
),
"integration_intervals": numpy.zeros(
(self.MAX_FPGAS,),
dtype=numpy.float32,
),
}
)
return defaults
def _parse_packet(self, packet):
fields = BSTPacket(packet)
fpga_nr = self.gn_index_to_fpga_nr(fields.gn_index)
# number of beamlets in the packet
beamlets = fields.payload()
nr_beamlets = beamlets.shape[0]
first_beamlet = fields.beamlet_index
last_beamlet = first_beamlet + nr_beamlets
# determine which input this packet contains data for
if last_beamlet > self.MAX_BEAMLETS:
# packet describes an input that is out of bounds for us
raise ValueError(
f"Packet describes {nr_beamlets} beamlets starting at "
f"{fields.beamlet_index}, but we are limited "
f"to describing MAX_BEAMLETS={self.MAX_BEAMLETS}"
)
if fields.payload_error:
# cannot trust the data if a payload error is reported
self.parameters["nof_payload_errors"][fpga_nr] += numpy.uint64(1)
# don't raise, as packet is valid
return
# process the packet
self.parameters["nof_valid_payloads"][fpga_nr] += numpy.uint64(1)
self.parameters["bst_values"][first_beamlet:last_beamlet] = beamlets
self.parameters["bst_timestamps"][fpga_nr] = numpy.float64(
fields.timestamp().timestamp()
)
self.parameters["integration_intervals"][
fpga_nr
] = fields.integration_interval()