Skip to content
Snippets Groups Projects
Commit d19cdd6a authored by Jan David Mol's avatar Jan David Mol
Browse files

Merge branch 'fix-timestamps-refactor-matrix-construction' into 'main'

L2SS-1469: Support variable offset & number of signal inputs

See merge request !61
parents d6a9eab5 f818b66d
No related branches found
No related tags found
1 merge request!61L2SS-1469: Support variable offset & number of signal inputs
Pipeline #55449 passed
...@@ -105,6 +105,7 @@ tox -e debug tests.requests.test_prometheus ...@@ -105,6 +105,7 @@ tox -e debug tests.requests.test_prometheus
``` ```
## Releasenotes ## Releasenotes
- 0.15.8 - SST/XST: Account for a signal index offset (HBA1) when collecting statistics
- 0.15.7 - Fix: Recording LBA statistics does not request HBA-only metadata - 0.15.7 - Fix: Recording LBA statistics does not request HBA-only metadata
- Fix: Syntax error when querying SDP metadata - Fix: Syntax error when querying SDP metadata
- 0.15.6 - Represent BSTs in 488x2 arrays - 0.15.6 - Represent BSTs in 488x2 arrays
......
0.15.7 0.15.8
...@@ -124,6 +124,14 @@ class SSTCollector(StatisticsCollector): ...@@ -124,6 +124,14 @@ class SSTCollector(StatisticsCollector):
# Maximum number of subbands we support (used to determine array sizes) # Maximum number of subbands we support (used to determine array sizes)
MAX_SUBBANDS = 512 MAX_SUBBANDS = 512
def __init__(
self, nr_signal_inputs: int = MAX_INPUTS, first_signal_input_index: int = 0
):
self.nr_signal_inputs = nr_signal_inputs
self.first_signal_input_index = first_signal_input_index
super().__init__()
def _default_parameters(self): def _default_parameters(self):
defaults = super()._default_parameters() defaults = super()._default_parameters()
...@@ -140,17 +148,17 @@ class SSTCollector(StatisticsCollector): ...@@ -140,17 +148,17 @@ class SSTCollector(StatisticsCollector):
), ),
# Last value array we've constructed out of the packets # Last value array we've constructed out of the packets
"sst_values": numpy.zeros( "sst_values": numpy.zeros(
(self.MAX_INPUTS, self.MAX_SUBBANDS), dtype=numpy.uint64 (self.nr_signal_inputs, self.MAX_SUBBANDS), dtype=numpy.uint64
),
"sst_timestamps": numpy.zeros(
(self.nr_signal_inputs,), dtype=numpy.float64
), ),
"sst_timestamps": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.float64),
"integration_intervals": numpy.zeros( "integration_intervals": numpy.zeros(
(self.MAX_INPUTS,), dtype=numpy.float32 (self.nr_signal_inputs,), dtype=numpy.float32
),
"subbands_calibrated": numpy.zeros(
(self.nr_signal_inputs,), dtype=bool
), ),
"subbands_calibrated": numpy.zeros((self.MAX_INPUTS,), dtype=bool),
# RECV attribute values to monitor the station configuration
"rcu_attenuator_dB": numpy.zeros((32, 3), dtype=numpy.int64),
"rcu_band_select": numpy.zeros((32, 3), dtype=numpy.int64),
"rcu_dth_on": numpy.full((32, 3), False),
} }
) )
...@@ -161,16 +169,17 @@ class SSTCollector(StatisticsCollector): ...@@ -161,16 +169,17 @@ class SSTCollector(StatisticsCollector):
fpga_nr = self.gn_index_to_fpga_nr(fields.gn_index) fpga_nr = self.gn_index_to_fpga_nr(fields.gn_index)
input_index = fields.signal_input_index - self.first_signal_input_index
# determine which input this packet contains data for # determine which input this packet contains data for
if fields.signal_input_index >= self.MAX_INPUTS: if not 0 <= input_index < self.nr_signal_inputs:
# packet describes an input that is out of bounds for us # packet describes an input that is out of bounds for us
raise ValueError( raise ValueError(
f"Packet describes input {fields.signal_input_index}, but we are " f"Packet describes input {fields.signal_input_index}, but we are "
f"limited to describing MAX_INPUTS={self.MAX_INPUTS}" f"limited to describing {self.nr_signal_inputs} starting at index "
f"{self.first_signal_input_index}"
) )
input_index = fields.signal_input_index
if fields.payload_error: if fields.payload_error:
# cannot trust the data if a payload error is reported # cannot trust the data if a payload error is reported
self.parameters["nof_payload_errors"][fpga_nr] += numpy.uint64(1) self.parameters["nof_payload_errors"][fpga_nr] += numpy.uint64(1)
...@@ -233,6 +242,27 @@ class XSTCollector(StatisticsCollector): ...@@ -233,6 +242,27 @@ class XSTCollector(StatisticsCollector):
# constants. # constants.
VALUES_PER_COMPLEX = 2 VALUES_PER_COMPLEX = 2
def __init__(
self, nr_signal_inputs: int = MAX_INPUTS, first_signal_input_index: int = 0
):
if nr_signal_inputs % self.BLOCK_LENGTH != 0:
# This restriction could be lifted if we slice sub blocks out
# of the XST packets.
raise ValueError(
f"Number of signal inputs must be a multiple of {self.BLOCK_LENGTH}, "
f"but is {nr_signal_inputs}"
)
self.nr_signal_inputs = nr_signal_inputs
self.first_signal_input_index = first_signal_input_index
super().__init__()
@property
def nr_blocks(self):
"""Number of blocks that contain XSTs for our signal inputs."""
return nr_baselines(self.nr_signal_inputs // self.BLOCK_LENGTH)
def _default_parameters(self): def _default_parameters(self):
defaults = super()._default_parameters() defaults = super()._default_parameters()
...@@ -251,7 +281,7 @@ class XSTCollector(StatisticsCollector): ...@@ -251,7 +281,7 @@ class XSTCollector(StatisticsCollector):
"xst_blocks": numpy.zeros( "xst_blocks": numpy.zeros(
( (
self.MAX_PARALLEL_SUBBANDS, self.MAX_PARALLEL_SUBBANDS,
self.MAX_BLOCKS, self.nr_blocks,
self.BLOCK_LENGTH * self.BLOCK_LENGTH * self.VALUES_PER_COMPLEX, self.BLOCK_LENGTH * self.BLOCK_LENGTH * self.VALUES_PER_COMPLEX,
), ),
dtype=numpy.int64, dtype=numpy.int64,
...@@ -260,7 +290,7 @@ class XSTCollector(StatisticsCollector): ...@@ -260,7 +290,7 @@ class XSTCollector(StatisticsCollector):
"xst_conjugated": numpy.zeros( "xst_conjugated": numpy.zeros(
( (
self.MAX_PARALLEL_SUBBANDS, self.MAX_PARALLEL_SUBBANDS,
self.MAX_BLOCKS, self.nr_blocks,
), ),
dtype=bool, dtype=bool,
), ),
...@@ -321,16 +351,19 @@ class XSTCollector(StatisticsCollector): ...@@ -321,16 +351,19 @@ class XSTCollector(StatisticsCollector):
# check whether set of baselines in this packet are not out of bounds # check whether set of baselines in this packet are not out of bounds
for antenna in (0, 1): for antenna in (0, 1):
if ( if not (
fields.first_baseline[antenna] + fields.nof_signal_inputs fields.first_baseline[antenna]
> self.MAX_INPUTS + fields.nof_signal_inputs
- self.first_signal_input_index
<= self.nr_signal_inputs
): ):
# packet describes an input that is out of bounds for us # packet describes an input that is out of bounds for us
raise ValueError( raise ValueError(
f"Packet describes {fields.nof_signal_inputs} x " f"Packet describes {fields.nof_signal_inputs} x "
f"{fields.nof_signal_inputs} baselines starting at " f"{fields.nof_signal_inputs} baselines starting at "
f"{fields.first_baseline}, but we are limited to describing " f"{fields.first_baseline}, but we are limited to describing "
f"MAX_INPUTS={self.MAX_INPUTS}" f"{self.nr_signal_inputs} starting at offset "
f"{self.first_signal_input_index}"
) )
# the blocks of baselines need to be tightly packed, and thus be provided # the blocks of baselines need to be tightly packed, and thus be provided
...@@ -338,7 +371,7 @@ class XSTCollector(StatisticsCollector): ...@@ -338,7 +371,7 @@ class XSTCollector(StatisticsCollector):
if fields.first_baseline[antenna] % self.BLOCK_LENGTH != 0: if fields.first_baseline[antenna] % self.BLOCK_LENGTH != 0:
raise ValueError( raise ValueError(
f"Packet describes baselines starting at {fields.first_baseline}, " f"Packet describes baselines starting at {fields.first_baseline}, "
f"but we require a multiple of BLOCK_LENGTH={self.MAX_INPUTS}" f"but we require a multiple of BLOCK_LENGTH={self.BLOCK_LENGTH}"
) )
# Make sure we always have a baseline (a,b) with a>=b. If not, we swap the # Make sure we always have a baseline (a,b) with a>=b. If not, we swap the
...@@ -351,6 +384,12 @@ class XSTCollector(StatisticsCollector): ...@@ -351,6 +384,12 @@ class XSTCollector(StatisticsCollector):
else: else:
conjugated = False conjugated = False
# Adjust for our offset
first_baseline = (
first_baseline[0] - self.first_signal_input_index,
first_baseline[1] - self.first_signal_input_index,
)
# we keep track of multiple subbands. select slot for this one # we keep track of multiple subbands. select slot for this one
subband_slot = self.select_subband_slot(fields.subband_index) subband_slot = self.select_subband_slot(fields.subband_index)
...@@ -374,8 +413,8 @@ class XSTCollector(StatisticsCollector): ...@@ -374,8 +413,8 @@ class XSTCollector(StatisticsCollector):
# We did enough checks on first_baseline for this to be a logic error in our # We did enough checks on first_baseline for this to be a logic error in our
# code # code
assert 0 <= block_index < self.MAX_BLOCKS, ( assert 0 <= block_index < self.nr_blocks, (
f"Received block {block_index}, but have only room for {self.MAX_BLOCKS}" f"Received block {block_index}, but have only room for {self.nr_blocks}"
f". Block starts at baseline {first_baseline}." f". Block starts at baseline {first_baseline}."
) )
...@@ -407,14 +446,14 @@ class XSTCollector(StatisticsCollector): ...@@ -407,14 +446,14 @@ class XSTCollector(StatisticsCollector):
subband_indices = range(self.MAX_PARALLEL_SUBBANDS) subband_indices = range(self.MAX_PARALLEL_SUBBANDS)
matrix = numpy.zeros( matrix = numpy.zeros(
(len(subband_indices), self.MAX_INPUTS, self.MAX_INPUTS), (len(subband_indices), self.nr_signal_inputs, self.nr_signal_inputs),
dtype=numpy.complex64, dtype=numpy.complex64,
) )
xst_blocks = self.parameters["xst_blocks"] xst_blocks = self.parameters["xst_blocks"]
xst_conjugated = self.parameters["xst_conjugated"] xst_conjugated = self.parameters["xst_conjugated"]
for matrix_idx, subband_index in enumerate(subband_indices): for matrix_idx, subband_index in enumerate(subband_indices):
for block_index in range(self.MAX_BLOCKS): for block_index in range(self.nr_blocks):
# convert real/imag int to complex float values. this works as # convert real/imag int to complex float values. this works as
# real/imag come in pairs # real/imag come in pairs
block = ( block = (
...@@ -515,6 +554,7 @@ class BSTCollector(StatisticsCollector): ...@@ -515,6 +554,7 @@ class BSTCollector(StatisticsCollector):
# process the packet # process the packet
self.parameters["nof_valid_payloads"][fpga_nr] += numpy.uint64(1) self.parameters["nof_valid_payloads"][fpga_nr] += numpy.uint64(1)
self.parameters["bst_values"][first_beamlet:last_beamlet] = beamlets self.parameters["bst_values"][first_beamlet:last_beamlet] = beamlets
self.parameters["bst_timestamps"][fpga_nr] = numpy.float64( self.parameters["bst_timestamps"][fpga_nr] = numpy.float64(
fields.timestamp().timestamp() fields.timestamp().timestamp()
......
0.3 0.4
...@@ -11,7 +11,7 @@ import argparse ...@@ -11,7 +11,7 @@ import argparse
import logging import logging
import sys import sys
import time import time
from typing import Dict from typing import Dict, NamedTuple
from tango import DeviceProxy from tango import DeviceProxy
...@@ -122,6 +122,37 @@ def _create_receiver(filename, host, port): ...@@ -122,6 +122,37 @@ def _create_receiver(filename, host, port):
sys.exit(1) sys.exit(1)
class SignalInputConfig(NamedTuple):
"""Configuration parameters with respect to
SDP's representation of its signal inputs."""
nr_signal_inputs: int
first_signal_input_index: int
def _signal_input_config(
devices: Dict[str, DeviceProxy],
) -> SignalInputConfig:
if "sdpfirmware" in devices:
try:
sdpfirmware_proxy = devices["sdpfirmware"]
return SignalInputConfig(
nr_signal_inputs=sdpfirmware_proxy.nr_signal_inputs_R,
first_signal_input_index=sdpfirmware_proxy.first_signal_input_index_R,
)
except (Exception,):
logger.exception(
"Could not determine signal input configuration. Reverting to default."
)
else:
logger.warning(
"Could not determine signal input configuration. Reverting to default."
)
# use a default big enough for all fields on all stations
return SignalInputConfig(nr_signal_inputs=192, first_signal_input_index=0)
def _create_writer( def _create_writer(
mode, mode,
interval, interval,
...@@ -131,18 +162,24 @@ def _create_writer( ...@@ -131,18 +162,24 @@ def _create_writer(
): ):
"""Create the writer""" """Create the writer"""
if mode == "XST": if mode == "XST":
sic = _signal_input_config(devices)
return ParallelXstHdf5Writer( return ParallelXstHdf5Writer(
new_file_time_interval=interval, new_file_time_interval=interval,
file_location=output_dir, file_location=output_dir,
decimation_factor=decimation, decimation_factor=decimation,
devices=devices, devices=devices,
nr_signal_inputs=sic.nr_signal_inputs,
first_signal_input_index=sic.first_signal_input_index,
) )
if mode == "SST": if mode == "SST":
sic = _signal_input_config(devices)
return SstHdf5Writer( return SstHdf5Writer(
new_file_time_interval=interval, new_file_time_interval=interval,
file_location=output_dir, file_location=output_dir,
decimation_factor=decimation, decimation_factor=decimation,
devices=devices, devices=devices,
nr_signal_inputs=sic.nr_signal_inputs,
first_signal_input_index=sic.first_signal_input_index,
) )
if mode == "BST": if mode == "BST":
return BstHdf5Writer( return BstHdf5Writer(
......
...@@ -551,6 +551,8 @@ class SstHdf5Writer(HDF5Writer): ...@@ -551,6 +551,8 @@ class SstHdf5Writer(HDF5Writer):
file_location, file_location,
decimation_factor, decimation_factor,
devices: Dict[str, DeviceProxy], devices: Dict[str, DeviceProxy],
nr_signal_inputs,
first_signal_input_index,
): ):
super().__init__( super().__init__(
new_file_time_interval, new_file_time_interval,
...@@ -559,12 +561,14 @@ class SstHdf5Writer(HDF5Writer): ...@@ -559,12 +561,14 @@ class SstHdf5Writer(HDF5Writer):
decimation_factor, decimation_factor,
devices=devices, devices=devices,
) )
self.nr_signal_inputs = nr_signal_inputs
self.first_signal_input_index = first_signal_input_index
def decoder(self, packet): def decoder(self, packet):
return SSTPacket(packet) return SSTPacket(packet)
def new_collector(self): def new_collector(self):
return SSTCollector() return SSTCollector(self.nr_signal_inputs, self.first_signal_input_index)
def get_matrix(self) -> StatisticsData: def get_matrix(self) -> StatisticsData:
matrix = super().get_matrix() matrix = super().get_matrix()
...@@ -647,6 +651,8 @@ class XstHdf5Writer(HDF5Writer): ...@@ -647,6 +651,8 @@ class XstHdf5Writer(HDF5Writer):
decimation_factor, decimation_factor,
devices: Dict[str, DeviceProxy], devices: Dict[str, DeviceProxy],
subband_index, subband_index,
nr_signal_inputs,
first_signal_input_index,
): ):
super().__init__( super().__init__(
new_file_time_interval, new_file_time_interval,
...@@ -656,12 +662,14 @@ class XstHdf5Writer(HDF5Writer): ...@@ -656,12 +662,14 @@ class XstHdf5Writer(HDF5Writer):
devices=devices, devices=devices,
) )
self.subband_index = subband_index self.subband_index = subband_index
self.nr_signal_inputs = nr_signal_inputs
self.first_signal_input_index = first_signal_input_index
def decoder(self, packet): def decoder(self, packet):
return XSTPacket(packet) return XSTPacket(packet)
def new_collector(self): def new_collector(self):
return XSTCollector() return XSTCollector(self.nr_signal_inputs, self.first_signal_input_index)
def next_filename(self, timestamp, suffix=".h5"): def next_filename(self, timestamp, suffix=".h5"):
time_str = str(timestamp.strftime("%Y-%m-%d-%H-%M-%S")) time_str = str(timestamp.strftime("%Y-%m-%d-%H-%M-%S"))
...@@ -706,6 +714,8 @@ class ParallelXstHdf5Writer: ...@@ -706,6 +714,8 @@ class ParallelXstHdf5Writer:
file_location, file_location,
decimation_factor, decimation_factor,
devices: Dict[str, DeviceProxy], devices: Dict[str, DeviceProxy],
nr_signal_inputs,
first_signal_input_index,
): ):
# maintain a dedicated HDF5Writer per subband # maintain a dedicated HDF5Writer per subband
self.writers = {} self.writers = {}
...@@ -719,6 +729,8 @@ class ParallelXstHdf5Writer: ...@@ -719,6 +729,8 @@ class ParallelXstHdf5Writer:
decimation_factor, decimation_factor,
devices, devices,
subband, subband,
nr_signal_inputs,
first_signal_input_index,
) )
self.new_writer = new_writer self.new_writer = new_writer
......
...@@ -345,6 +345,59 @@ class TestXSTCollector(base.TestCase): ...@@ -345,6 +345,59 @@ class TestXSTCollector(base.TestCase):
self.assertEqual(1, collector.parameters["nof_payload_errors"][0]) self.assertEqual(1, collector.parameters["nof_payload_errors"][0])
self.assertEqual(fpga_index, collector.parameters["gn_indices"][0]) self.assertEqual(fpga_index, collector.parameters["gn_indices"][0])
def test_signal_index_offset(self):
nr_signal_inputs = 24
first_signal_input = 12
collector = XSTCollector(nr_signal_inputs, first_signal_input)
# a valid packet as obtained from SDP, with 64-bit BE 1+1j as payload at
# (12,12)
packet = (
b"X\x05\x00\x00\x00\x00\x00\x00\x10\x08\x00\x02\xfa\xef\x00f\x0c\x0c\x0c"
b"\x08\x01 \x14\x00\x00\x01!\xd9&z\x1b\xb3"
+ 288 * b"\x00\x00\x00\x00\x00\x00\x00\x01"
)
# parse it ourselves to extract info nicely
fields = XSTPacket(packet)
# verify we constructed a useful packet for this test
self.assertEqual(
(first_signal_input, first_signal_input), fields.first_baseline
)
# this should not throw
collector.process_packet(packet)
# check whether the data ended up in the right block, and the rest is still zero
xst_values = collector.xst_values()[0]
# XST shape should be equal to number of inputs
self.assertEqual((nr_signal_inputs, nr_signal_inputs), xst_values.shape)
for baseline_a in range(nr_signal_inputs):
for baseline_b in range(nr_signal_inputs):
if baseline_b > baseline_a:
# only scan top-left triangle
continue
# the packet provides baselines (0,0) - (12,12)
if 0 <= baseline_a < 12 and 0 <= baseline_b < 12:
self.assertEqual(
1 + 1j,
xst_values[baseline_a][baseline_b],
msg=f"element [{baseline_a}][{baseline_b}] did not end up in "
f"XST matrix.",
)
else:
self.assertEqual(
0 + 0j,
xst_values[baseline_a][baseline_b],
msg=f"element [{baseline_a}][{baseline_b}] was not in packet, "
f"but was written to the XST matrix.",
)
class TestBSTCollector(base.TestCase): class TestBSTCollector(base.TestCase):
def test_valid_packet(self): def test_valid_packet(self):
......
...@@ -332,6 +332,9 @@ class FakeSDPFirmwareDeviceProxy: ...@@ -332,6 +332,9 @@ class FakeSDPFirmwareDeviceProxy:
FPGA_firmware_version_R = ["firmware version"] * N_pn FPGA_firmware_version_R = ["firmware version"] * N_pn
FPGA_hardware_version_R = ["hardware version"] * N_pn FPGA_hardware_version_R = ["hardware version"] * N_pn
nr_signal_inputs_R = 192
first_signal_input_index_R = 0
def __init__(self, name): def __init__(self, name):
self._name = name self._name = name
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment