diff --git a/README.md b/README.md index 21b80731035e101eb9fe8d1b5549f3b95efecf9b..2060adfcc2b2a92a9e8e2d7211c29755a8b5f423 100644 --- a/README.md +++ b/README.md @@ -105,6 +105,7 @@ tox -e debug tests.requests.test_prometheus ``` ## Releasenotes +- 0.15.8 - SST/XST: Account for a signal index offset (HBA1) when collecting statistics - 0.15.7 - Fix: Recording LBA statistics does not request HBA-only metadata - Fix: Syntax error when querying SDP metadata - 0.15.6 - Represent BSTs in 488x2 arrays diff --git a/VERSION b/VERSION index 3b1c7940850144076322076877708039ed967be1..1e1a04ab70bd7071a50a26bd0637ec1539c76084 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.15.7 +0.15.8 diff --git a/lofar_station_client/statistics/collector.py b/lofar_station_client/statistics/collector.py index 27e191fa996417509f45951202aaf0193012ab84..e8a25d4b9fad688faa688ef33466aa2c3aad1a17 100644 --- a/lofar_station_client/statistics/collector.py +++ b/lofar_station_client/statistics/collector.py @@ -124,6 +124,14 @@ class SSTCollector(StatisticsCollector): # Maximum number of subbands we support (used to determine array sizes) MAX_SUBBANDS = 512 + def __init__( + self, nr_signal_inputs: int = MAX_INPUTS, first_signal_input_index: int = 0 + ): + self.nr_signal_inputs = nr_signal_inputs + self.first_signal_input_index = first_signal_input_index + + super().__init__() + def _default_parameters(self): defaults = super()._default_parameters() @@ -140,17 +148,17 @@ class SSTCollector(StatisticsCollector): ), # Last value array we've constructed out of the packets "sst_values": numpy.zeros( - (self.MAX_INPUTS, self.MAX_SUBBANDS), dtype=numpy.uint64 + (self.nr_signal_inputs, self.MAX_SUBBANDS), dtype=numpy.uint64 + ), + "sst_timestamps": numpy.zeros( + (self.nr_signal_inputs,), dtype=numpy.float64 ), - "sst_timestamps": numpy.zeros((self.MAX_INPUTS,), dtype=numpy.float64), "integration_intervals": numpy.zeros( - (self.MAX_INPUTS,), dtype=numpy.float32 + (self.nr_signal_inputs,), dtype=numpy.float32 + ), + "subbands_calibrated": numpy.zeros( + (self.nr_signal_inputs,), dtype=bool ), - "subbands_calibrated": numpy.zeros((self.MAX_INPUTS,), dtype=bool), - # RECV attribute values to monitor the station configuration - "rcu_attenuator_dB": numpy.zeros((32, 3), dtype=numpy.int64), - "rcu_band_select": numpy.zeros((32, 3), dtype=numpy.int64), - "rcu_dth_on": numpy.full((32, 3), False), } ) @@ -161,16 +169,17 @@ class SSTCollector(StatisticsCollector): fpga_nr = self.gn_index_to_fpga_nr(fields.gn_index) + input_index = fields.signal_input_index - self.first_signal_input_index + # determine which input this packet contains data for - if fields.signal_input_index >= self.MAX_INPUTS: + if not 0 <= input_index < self.nr_signal_inputs: # packet describes an input that is out of bounds for us raise ValueError( f"Packet describes input {fields.signal_input_index}, but we are " - f"limited to describing MAX_INPUTS={self.MAX_INPUTS}" + f"limited to describing {self.nr_signal_inputs} starting at index " + f"{self.first_signal_input_index}" ) - input_index = fields.signal_input_index - if fields.payload_error: # cannot trust the data if a payload error is reported self.parameters["nof_payload_errors"][fpga_nr] += numpy.uint64(1) @@ -233,6 +242,27 @@ class XSTCollector(StatisticsCollector): # constants. VALUES_PER_COMPLEX = 2 + def __init__( + self, nr_signal_inputs: int = MAX_INPUTS, first_signal_input_index: int = 0 + ): + if nr_signal_inputs % self.BLOCK_LENGTH != 0: + # This restriction could be lifted if we slice sub blocks out + # of the XST packets. + raise ValueError( + f"Number of signal inputs must be a multiple of {self.BLOCK_LENGTH}, " + f"but is {nr_signal_inputs}" + ) + + self.nr_signal_inputs = nr_signal_inputs + self.first_signal_input_index = first_signal_input_index + + super().__init__() + + @property + def nr_blocks(self): + """Number of blocks that contain XSTs for our signal inputs.""" + return nr_baselines(self.nr_signal_inputs // self.BLOCK_LENGTH) + def _default_parameters(self): defaults = super()._default_parameters() @@ -251,7 +281,7 @@ class XSTCollector(StatisticsCollector): "xst_blocks": numpy.zeros( ( self.MAX_PARALLEL_SUBBANDS, - self.MAX_BLOCKS, + self.nr_blocks, self.BLOCK_LENGTH * self.BLOCK_LENGTH * self.VALUES_PER_COMPLEX, ), dtype=numpy.int64, @@ -260,7 +290,7 @@ class XSTCollector(StatisticsCollector): "xst_conjugated": numpy.zeros( ( self.MAX_PARALLEL_SUBBANDS, - self.MAX_BLOCKS, + self.nr_blocks, ), dtype=bool, ), @@ -321,16 +351,19 @@ class XSTCollector(StatisticsCollector): # check whether set of baselines in this packet are not out of bounds for antenna in (0, 1): - if ( - fields.first_baseline[antenna] + fields.nof_signal_inputs - > self.MAX_INPUTS + if not ( + fields.first_baseline[antenna] + + fields.nof_signal_inputs + - self.first_signal_input_index + <= self.nr_signal_inputs ): # packet describes an input that is out of bounds for us raise ValueError( - f"Packet describes {fields.nof_signal_inputs} x" - f"{fields.nof_signal_inputs} baselines starting at" - f"{fields.first_baseline}, but we are limited to describing" - f"MAX_INPUTS={self.MAX_INPUTS}" + f"Packet describes {fields.nof_signal_inputs} x " + f"{fields.nof_signal_inputs} baselines starting at " + f"{fields.first_baseline}, but we are limited to describing " + f"{self.nr_signal_inputs} starting at offset " + f"{self.first_signal_input_index}" ) # the blocks of baselines need to be tightly packed, and thus be provided @@ -338,7 +371,7 @@ class XSTCollector(StatisticsCollector): if fields.first_baseline[antenna] % self.BLOCK_LENGTH != 0: raise ValueError( f"Packet describes baselines starting at {fields.first_baseline}, " - f"but we require a multiple of BLOCK_LENGTH={self.MAX_INPUTS}" + f"but we require a multiple of BLOCK_LENGTH={self.BLOCK_LENGTH}" ) # Make sure we always have a baseline (a,b) with a>=b. If not, we swap the @@ -351,6 +384,12 @@ class XSTCollector(StatisticsCollector): else: conjugated = False + # Adjust for our offset + first_baseline = ( + first_baseline[0] - self.first_signal_input_index, + first_baseline[1] - self.first_signal_input_index, + ) + # we keep track of multiple subbands. select slot for this one subband_slot = self.select_subband_slot(fields.subband_index) @@ -374,8 +413,8 @@ class XSTCollector(StatisticsCollector): # We did enough checks on first_baseline for this to be a logic error in our # code - assert 0 <= block_index < self.MAX_BLOCKS, ( - f"Received block {block_index}, but have only room for {self.MAX_BLOCKS}" + assert 0 <= block_index < self.nr_blocks, ( + f"Received block {block_index}, but have only room for {self.nr_blocks}" f". Block starts at baseline {first_baseline}." ) @@ -407,14 +446,14 @@ class XSTCollector(StatisticsCollector): subband_indices = range(self.MAX_PARALLEL_SUBBANDS) matrix = numpy.zeros( - (len(subband_indices), self.MAX_INPUTS, self.MAX_INPUTS), + (len(subband_indices), self.nr_signal_inputs, self.nr_signal_inputs), dtype=numpy.complex64, ) xst_blocks = self.parameters["xst_blocks"] xst_conjugated = self.parameters["xst_conjugated"] for matrix_idx, subband_index in enumerate(subband_indices): - for block_index in range(self.MAX_BLOCKS): + for block_index in range(self.nr_blocks): # convert real/imag int to complex float values. this works as # real/imag come in pairs block = ( @@ -515,6 +554,7 @@ class BSTCollector(StatisticsCollector): # process the packet self.parameters["nof_valid_payloads"][fpga_nr] += numpy.uint64(1) + self.parameters["bst_values"][first_beamlet:last_beamlet] = beamlets self.parameters["bst_timestamps"][fpga_nr] = numpy.float64( fields.timestamp().timestamp() diff --git a/lofar_station_client/statistics/writer/VERSION b/lofar_station_client/statistics/writer/VERSION index be586341736ee60d6ca2be0f3762a307e8fe79f9..bd73f47072b1fe4b9914ec14a7f6d47fcc8f816a 100644 --- a/lofar_station_client/statistics/writer/VERSION +++ b/lofar_station_client/statistics/writer/VERSION @@ -1 +1 @@ -0.3 +0.4 diff --git a/lofar_station_client/statistics/writer/entry.py b/lofar_station_client/statistics/writer/entry.py index 3b15ca9c22c882d5891c0d8396af3926724fee96..2c9e1faec5f0e436ae8a451429ed51e0e8266075 100644 --- a/lofar_station_client/statistics/writer/entry.py +++ b/lofar_station_client/statistics/writer/entry.py @@ -11,7 +11,7 @@ import argparse import logging import sys import time -from typing import Dict +from typing import Dict, NamedTuple from tango import DeviceProxy @@ -122,6 +122,37 @@ def _create_receiver(filename, host, port): sys.exit(1) +class SignalInputConfig(NamedTuple): + """Configuration parameters with respect to + SDP's representation of its signal inputs.""" + + nr_signal_inputs: int + first_signal_input_index: int + + +def _signal_input_config( + devices: Dict[str, DeviceProxy], +) -> SignalInputConfig: + if "sdpfirmware" in devices: + try: + sdpfirmware_proxy = devices["sdpfirmware"] + return SignalInputConfig( + nr_signal_inputs=sdpfirmware_proxy.nr_signal_inputs_R, + first_signal_input_index=sdpfirmware_proxy.first_signal_input_index_R, + ) + except (Exception,): + logger.exception( + "Could not determine signal input configuration. Reverting to default." + ) + else: + logger.warning( + "Could not determine signal input configuration. Reverting to default." + ) + + # use a default big enough for all fields on all stations + return SignalInputConfig(nr_signal_inputs=192, first_signal_input_index=0) + + def _create_writer( mode, interval, @@ -131,18 +162,24 @@ def _create_writer( ): """Create the writer""" if mode == "XST": + sic = _signal_input_config(devices) return ParallelXstHdf5Writer( new_file_time_interval=interval, file_location=output_dir, decimation_factor=decimation, devices=devices, + nr_signal_inputs=sic.nr_signal_inputs, + first_signal_input_index=sic.first_signal_input_index, ) if mode == "SST": + sic = _signal_input_config(devices) return SstHdf5Writer( new_file_time_interval=interval, file_location=output_dir, decimation_factor=decimation, devices=devices, + nr_signal_inputs=sic.nr_signal_inputs, + first_signal_input_index=sic.first_signal_input_index, ) if mode == "BST": return BstHdf5Writer( diff --git a/lofar_station_client/statistics/writer/hdf5.py b/lofar_station_client/statistics/writer/hdf5.py index c232acf42fd94b74cf310075959a1c525643887b..15d06a55fff4b5263e8262d3a29e5a07bcbf85af 100644 --- a/lofar_station_client/statistics/writer/hdf5.py +++ b/lofar_station_client/statistics/writer/hdf5.py @@ -551,6 +551,8 @@ class SstHdf5Writer(HDF5Writer): file_location, decimation_factor, devices: Dict[str, DeviceProxy], + nr_signal_inputs, + first_signal_input_index, ): super().__init__( new_file_time_interval, @@ -559,12 +561,14 @@ class SstHdf5Writer(HDF5Writer): decimation_factor, devices=devices, ) + self.nr_signal_inputs = nr_signal_inputs + self.first_signal_input_index = first_signal_input_index def decoder(self, packet): return SSTPacket(packet) def new_collector(self): - return SSTCollector() + return SSTCollector(self.nr_signal_inputs, self.first_signal_input_index) def get_matrix(self) -> StatisticsData: matrix = super().get_matrix() @@ -647,6 +651,8 @@ class XstHdf5Writer(HDF5Writer): decimation_factor, devices: Dict[str, DeviceProxy], subband_index, + nr_signal_inputs, + first_signal_input_index, ): super().__init__( new_file_time_interval, @@ -656,12 +662,14 @@ class XstHdf5Writer(HDF5Writer): devices=devices, ) self.subband_index = subband_index + self.nr_signal_inputs = nr_signal_inputs + self.first_signal_input_index = first_signal_input_index def decoder(self, packet): return XSTPacket(packet) def new_collector(self): - return XSTCollector() + return XSTCollector(self.nr_signal_inputs, self.first_signal_input_index) def next_filename(self, timestamp, suffix=".h5"): time_str = str(timestamp.strftime("%Y-%m-%d-%H-%M-%S")) @@ -706,6 +714,8 @@ class ParallelXstHdf5Writer: file_location, decimation_factor, devices: Dict[str, DeviceProxy], + nr_signal_inputs, + first_signal_input_index, ): # maintain a dedicated HDF5Writer per subband self.writers = {} @@ -719,6 +729,8 @@ class ParallelXstHdf5Writer: decimation_factor, devices, subband, + nr_signal_inputs, + first_signal_input_index, ) self.new_writer = new_writer diff --git a/tests/statistics/test_collector.py b/tests/statistics/test_collector.py index c9644a5579e285103b0e8a3c117ddb2f288bb8ea..8a53f89b7cfdef9bbfa4e95ce419f0c7948d63cf 100644 --- a/tests/statistics/test_collector.py +++ b/tests/statistics/test_collector.py @@ -345,6 +345,59 @@ class TestXSTCollector(base.TestCase): self.assertEqual(1, collector.parameters["nof_payload_errors"][0]) self.assertEqual(fpga_index, collector.parameters["gn_indices"][0]) + def test_signal_index_offset(self): + nr_signal_inputs = 24 + first_signal_input = 12 + + collector = XSTCollector(nr_signal_inputs, first_signal_input) + + # a valid packet as obtained from SDP, with 64-bit BE 1+1j as payload at + # (12,12) + packet = ( + b"X\x05\x00\x00\x00\x00\x00\x00\x10\x08\x00\x02\xfa\xef\x00f\x0c\x0c\x0c" + b"\x08\x01 \x14\x00\x00\x01!\xd9&z\x1b\xb3" + + 288 * b"\x00\x00\x00\x00\x00\x00\x00\x01" + ) + + # parse it ourselves to extract info nicely + fields = XSTPacket(packet) + + # verify we constructed a useful packet for this test + self.assertEqual( + (first_signal_input, first_signal_input), fields.first_baseline + ) + + # this should not throw + collector.process_packet(packet) + + # check whether the data ended up in the right block, and the rest is still zero + xst_values = collector.xst_values()[0] + + # XST shape should be equal to number of inputs + self.assertEqual((nr_signal_inputs, nr_signal_inputs), xst_values.shape) + + for baseline_a in range(nr_signal_inputs): + for baseline_b in range(nr_signal_inputs): + if baseline_b > baseline_a: + # only scan top-left triangle + continue + + # the packet provides baselines (0,0) - (12,12) + if 0 <= baseline_a < 12 and 0 <= baseline_b < 12: + self.assertEqual( + 1 + 1j, + xst_values[baseline_a][baseline_b], + msg=f"element [{baseline_a}][{baseline_b}] did not end up in " + f"XST matrix.", + ) + else: + self.assertEqual( + 0 + 0j, + xst_values[baseline_a][baseline_b], + msg=f"element [{baseline_a}][{baseline_b}] was not in packet, " + f"but was written to the XST matrix.", + ) + class TestBSTCollector(base.TestCase): def test_valid_packet(self): diff --git a/tests/test_devices.py b/tests/test_devices.py index d0e0741623642adebf4b0568592e36eb5aebc42e..6a5d2124bf2b5d8e12085441bb59c9dd4f599141 100644 --- a/tests/test_devices.py +++ b/tests/test_devices.py @@ -332,6 +332,9 @@ class FakeSDPFirmwareDeviceProxy: FPGA_firmware_version_R = ["firmware version"] * N_pn FPGA_hardware_version_R = ["hardware version"] * N_pn + nr_signal_inputs_R = 192 + first_signal_input_index_R = 0 + def __init__(self, name): self._name = name