diff --git a/README.md b/README.md index 2332056070cf351ef16ad136741797c33b4f941b..765ca5e88e74ded6f12eea718cee8bb1c7ec8c4d 100644 --- a/README.md +++ b/README.md @@ -131,6 +131,7 @@ tox -e debug tests.requests.test_prometheus ## Release notes +- 0.18.1 - Flush HDF5 files explicitly, reduce memory usage for XSTs - 0.18.0 - MultiStationObservation and StationFutures allow multi field observations - 0.17.3 - Fix hosts and ports to be compatible with consul overlay network - 0.17.2 - Fix antennafield_device naming after separation in `AFL` and `AFH` diff --git a/VERSION b/VERSION index 884e9604b53479049601c9877c3d81978ff8d861..249afd517d9df2473f6cf45b440786f180bc656b 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.17.3 +0.18.1 diff --git a/lofar_station_client/file_access/hdf/_hdf_writers.py b/lofar_station_client/file_access/hdf/_hdf_writers.py index 063defbd668732c959cb86233196f2c0441fa863..b1e52ea7b0e9d26030eff53ba9dcf39a0767d95d 100644 --- a/lofar_station_client/file_access/hdf/_hdf_writers.py +++ b/lofar_station_client/file_access/hdf/_hdf_writers.py @@ -32,6 +32,7 @@ class HdfFileWriter(HdfFileReader[T], FileWriter[T]): def __init__(self, name, target_type, create): self._create = create + self._hdf5_file = None self.writers: list[HdfDataWriter] = [] super().__init__(name, target_type) @@ -47,6 +48,9 @@ class HdfFileWriter(HdfFileReader[T], FileWriter[T]): writer.flush() self.writers = [] + if self._hdf5_file is not None: + self._hdf5_file.flush() + def close(self): self.flush() super().close() diff --git a/lofar_station_client/statistics/collector.py b/lofar_station_client/statistics/collector.py index 9de6825018162c5758ea87a8057ce8a954f89143..b66799a884392cc482542d3abfd5d4bf1731c93c 100644 --- a/lofar_station_client/statistics/collector.py +++ b/lofar_station_client/statistics/collector.py @@ -243,7 +243,10 @@ class XSTCollector(StatisticsCollector): VALUES_PER_COMPLEX = 2 def __init__( - self, nr_signal_inputs: int = MAX_INPUTS, first_signal_input_index: int = 0 + self, + nr_signal_inputs: int = MAX_INPUTS, + first_signal_input_index: int = 0, + nr_parallel_subbands: int = MAX_PARALLEL_SUBBANDS, ): if nr_signal_inputs % self.BLOCK_LENGTH != 0: # This restriction could be lifted if we slice sub blocks out @@ -255,6 +258,7 @@ class XSTCollector(StatisticsCollector): self.nr_signal_inputs = nr_signal_inputs self.first_signal_input_index = first_signal_input_index + self.nr_parallel_subbands = nr_parallel_subbands super().__init__() @@ -280,7 +284,7 @@ class XSTCollector(StatisticsCollector): # Last value array we've constructed out of the packets "xst_blocks": numpy.zeros( ( - self.MAX_PARALLEL_SUBBANDS, + self.nr_parallel_subbands, self.nr_blocks, self.BLOCK_LENGTH * self.BLOCK_LENGTH * self.VALUES_PER_COMPLEX, ), @@ -289,20 +293,20 @@ class XSTCollector(StatisticsCollector): # Whether the values are actually conjugated and transposed "xst_conjugated": numpy.zeros( ( - self.MAX_PARALLEL_SUBBANDS, + self.nr_parallel_subbands, self.nr_blocks, ), dtype=bool, ), # When the youngest data for each subband was received "xst_timestamps": numpy.zeros( - (self.MAX_PARALLEL_SUBBANDS,), dtype=numpy.float64 + (self.nr_parallel_subbands,), dtype=numpy.float64 ), "xst_subbands": numpy.zeros( - (self.MAX_PARALLEL_SUBBANDS,), dtype=numpy.uint16 + (self.nr_parallel_subbands,), dtype=numpy.uint16 ), "xst_integration_intervals": numpy.zeros( - (self.MAX_PARALLEL_SUBBANDS,), dtype=numpy.float32 + (self.nr_parallel_subbands,), dtype=numpy.float32 ), } ) @@ -310,7 +314,7 @@ class XSTCollector(StatisticsCollector): return defaults def select_subband_slot(self, subband): - """Return which subband slot (0..MAX_PARALLEL_SUBBANDS) to use + """Return which subband slot (0..nr_parallel_subbands) to use Selects which frontend to use if it is a new subband. Keep recording the same subband if we're already tracking it, but @@ -394,9 +398,9 @@ class XSTCollector(StatisticsCollector): # we keep track of multiple subbands. select slot for this one subband_slot = self.select_subband_slot(fields.subband_index) - assert 0 <= subband_slot < self.MAX_PARALLEL_SUBBANDS, ( + assert 0 <= subband_slot < self.nr_parallel_subbands, ( f"Selected slot{subband_slot}, but only have room for " - f"{self.MAX_PARALLEL_SUBBANDS}. Existing slots are " + f"{self.nr_parallel_subbands}. Existing slots are " f"{self.parameters['xst_subbands']}, processing subband " f"{fields.subband_index}." ) @@ -439,12 +443,12 @@ class XSTCollector(StatisticsCollector): def xst_values(self, subband_indices=None): """xst_blocks, but as a matrix[len(subband_indices)][MAX_INPUTS][MAX_INPUTS] - The subband indices must be in [0..MAX_PARALLEL_SUBBANDS). By default, all + The subband indices must be in [0..nr_parallel_subbands). By default, all recorded XSTs are returned. """ if subband_indices is None: - subband_indices = range(self.MAX_PARALLEL_SUBBANDS) + subband_indices = range(self.nr_parallel_subbands) matrix = numpy.zeros( (len(subband_indices), self.nr_signal_inputs, self.nr_signal_inputs), diff --git a/lofar_station_client/statistics/writer/hdf5.py b/lofar_station_client/statistics/writer/hdf5.py index 7ff165e806a354e81867dd03fbc8bf63678f0449..f33489e84c1c0fd89cfec5f0e4efd350022d340d 100644 --- a/lofar_station_client/statistics/writer/hdf5.py +++ b/lofar_station_client/statistics/writer/hdf5.py @@ -659,7 +659,7 @@ class XstHdf5Writer(HDF5Writer): return XSTPacket(packet) def new_collector(self): - return XSTCollector(self.nr_signal_inputs, self.first_signal_input_index) + return XSTCollector(self.nr_signal_inputs, self.first_signal_input_index, 1) def next_filename(self, timestamp, suffix=".h5"): time_str = str(timestamp.strftime("%Y-%m-%d-%H-%M-%S"))