Skip to content
Snippets Groups Projects
Commit a2b539a3 authored by Jan David Mol's avatar Jan David Mol
Browse files

Merge branch 'flush' into 'main'

L2SS-1604: Statistics writers: flush & reduce memory usage

See merge request !79
parents c4b735e4 acfcc80f
No related branches found
No related tags found
1 merge request!79L2SS-1604: Statistics writers: flush & reduce memory usage
Pipeline #62948 passed
......@@ -131,6 +131,7 @@ tox -e debug tests.requests.test_prometheus
## Release notes
- 0.18.1 - Flush HDF5 files explicitly, reduce memory usage for XSTs
- 0.18.0 - MultiStationObservation and StationFutures allow multi field observations
- 0.17.3 - Fix hosts and ports to be compatible with consul overlay network
- 0.17.2 - Fix antennafield_device naming after separation in `AFL` and `AFH`
......
0.17.3
0.18.1
......@@ -32,6 +32,7 @@ class HdfFileWriter(HdfFileReader[T], FileWriter[T]):
def __init__(self, name, target_type, create):
self._create = create
self._hdf5_file = None
self.writers: list[HdfDataWriter] = []
super().__init__(name, target_type)
......@@ -47,6 +48,9 @@ class HdfFileWriter(HdfFileReader[T], FileWriter[T]):
writer.flush()
self.writers = []
if self._hdf5_file is not None:
self._hdf5_file.flush()
def close(self):
self.flush()
super().close()
......
......@@ -243,7 +243,10 @@ class XSTCollector(StatisticsCollector):
VALUES_PER_COMPLEX = 2
def __init__(
self, nr_signal_inputs: int = MAX_INPUTS, first_signal_input_index: int = 0
self,
nr_signal_inputs: int = MAX_INPUTS,
first_signal_input_index: int = 0,
nr_parallel_subbands: int = MAX_PARALLEL_SUBBANDS,
):
if nr_signal_inputs % self.BLOCK_LENGTH != 0:
# This restriction could be lifted if we slice sub blocks out
......@@ -255,6 +258,7 @@ class XSTCollector(StatisticsCollector):
self.nr_signal_inputs = nr_signal_inputs
self.first_signal_input_index = first_signal_input_index
self.nr_parallel_subbands = nr_parallel_subbands
super().__init__()
......@@ -280,7 +284,7 @@ class XSTCollector(StatisticsCollector):
# Last value array we've constructed out of the packets
"xst_blocks": numpy.zeros(
(
self.MAX_PARALLEL_SUBBANDS,
self.nr_parallel_subbands,
self.nr_blocks,
self.BLOCK_LENGTH * self.BLOCK_LENGTH * self.VALUES_PER_COMPLEX,
),
......@@ -289,20 +293,20 @@ class XSTCollector(StatisticsCollector):
# Whether the values are actually conjugated and transposed
"xst_conjugated": numpy.zeros(
(
self.MAX_PARALLEL_SUBBANDS,
self.nr_parallel_subbands,
self.nr_blocks,
),
dtype=bool,
),
# When the youngest data for each subband was received
"xst_timestamps": numpy.zeros(
(self.MAX_PARALLEL_SUBBANDS,), dtype=numpy.float64
(self.nr_parallel_subbands,), dtype=numpy.float64
),
"xst_subbands": numpy.zeros(
(self.MAX_PARALLEL_SUBBANDS,), dtype=numpy.uint16
(self.nr_parallel_subbands,), dtype=numpy.uint16
),
"xst_integration_intervals": numpy.zeros(
(self.MAX_PARALLEL_SUBBANDS,), dtype=numpy.float32
(self.nr_parallel_subbands,), dtype=numpy.float32
),
}
)
......@@ -310,7 +314,7 @@ class XSTCollector(StatisticsCollector):
return defaults
def select_subband_slot(self, subband):
"""Return which subband slot (0..MAX_PARALLEL_SUBBANDS) to use
"""Return which subband slot (0..nr_parallel_subbands) to use
Selects which frontend to use if it is a new subband.
Keep recording the same subband if we're already tracking it, but
......@@ -394,9 +398,9 @@ class XSTCollector(StatisticsCollector):
# we keep track of multiple subbands. select slot for this one
subband_slot = self.select_subband_slot(fields.subband_index)
assert 0 <= subband_slot < self.MAX_PARALLEL_SUBBANDS, (
assert 0 <= subband_slot < self.nr_parallel_subbands, (
f"Selected slot{subband_slot}, but only have room for "
f"{self.MAX_PARALLEL_SUBBANDS}. Existing slots are "
f"{self.nr_parallel_subbands}. Existing slots are "
f"{self.parameters['xst_subbands']}, processing subband "
f"{fields.subband_index}."
)
......@@ -439,12 +443,12 @@ class XSTCollector(StatisticsCollector):
def xst_values(self, subband_indices=None):
"""xst_blocks, but as a matrix[len(subband_indices)][MAX_INPUTS][MAX_INPUTS]
The subband indices must be in [0..MAX_PARALLEL_SUBBANDS). By default, all
The subband indices must be in [0..nr_parallel_subbands). By default, all
recorded XSTs are returned.
"""
if subband_indices is None:
subband_indices = range(self.MAX_PARALLEL_SUBBANDS)
subband_indices = range(self.nr_parallel_subbands)
matrix = numpy.zeros(
(len(subband_indices), self.nr_signal_inputs, self.nr_signal_inputs),
......
......@@ -659,7 +659,7 @@ class XstHdf5Writer(HDF5Writer):
return XSTPacket(packet)
def new_collector(self):
return XSTCollector(self.nr_signal_inputs, self.first_signal_input_index)
return XSTCollector(self.nr_signal_inputs, self.first_signal_input_index, 1)
def next_filename(self, timestamp, suffix=".h5"):
time_str = str(timestamp.strftime("%Y-%m-%d-%H-%M-%S"))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment