Skip to content
Snippets Groups Projects
Commit 5593e579 authored by Hannes Feldt's avatar Hannes Feldt
Browse files

cleanup

parent 607b7084
No related branches found
No related tags found
1 merge request!93L2SS-1582: Fix handling of closing streams on HDF5 files
Pipeline #83993 passed
# Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy) # Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
"""Hdf5 packets file writer""" """Hdf5 packets file writer"""
...@@ -29,7 +29,6 @@ from lofar_station_client.statistics.collectors._xst import XSTCollector ...@@ -29,7 +29,6 @@ from lofar_station_client.statistics.collectors._xst import XSTCollector
from lofar_station_client.statistics.packets import StatisticsPacket from lofar_station_client.statistics.packets import StatisticsPacket
from lofar_station_client.statistics.statistics_data import ( from lofar_station_client.statistics.statistics_data import (
StatisticsDataFile, StatisticsDataFile,
StatisticsData,
) )
try: try:
...@@ -378,7 +377,7 @@ class HDF5Writer(ABC): ...@@ -378,7 +377,7 @@ class HDF5Writer(ABC):
matrix_name, matrix_name,
) )
def _add_device_metadata(self, matrix: StatisticsData): def _add_device_metadata(self, matrix: numpy.ndarray):
# add station state # add station state
if self.antennafield_device: if self.antennafield_device:
try: try:
...@@ -464,7 +463,7 @@ class HDF5Writer(ABC): ...@@ -464,7 +463,7 @@ class HDF5Writer(ABC):
return matrix return matrix
@abstractmethod @abstractmethod
def get_matrix_data(self) -> StatisticsData: def get_matrix_data(self) -> numpy.ndarray:
"""Abstract method""" """Abstract method"""
def next_filename(self, timestamp, suffix=".h5"): def next_filename(self, timestamp, suffix=".h5"):
...@@ -559,12 +558,12 @@ class SstHdf5Writer(HDF5Writer): ...@@ -559,12 +558,12 @@ class SstHdf5Writer(HDF5Writer):
def new_collector(self): def new_collector(self):
return SSTCollector(self.nr_signal_inputs, self.first_signal_input_index) return SSTCollector(self.nr_signal_inputs, self.first_signal_input_index)
def get_matrix(self) -> StatisticsData: def get_matrix(self) -> numpy.ndarray:
matrix = super().get_matrix() matrix = super().get_matrix()
matrix.subbands = numpy.array(range(512)) matrix.subbands = numpy.array(range(512))
return matrix return matrix
def get_matrix_data(self) -> StatisticsData: def get_matrix_data(self) -> numpy.ndarray:
# first obtain all values from SDP # first obtain all values from SDP
all_values = self.current_collector.parameters["sst_values"].astype( all_values = self.current_collector.parameters["sst_values"].astype(
numpy.float32 numpy.float32
...@@ -576,7 +575,7 @@ class SstHdf5Writer(HDF5Writer): ...@@ -576,7 +575,7 @@ class SstHdf5Writer(HDF5Writer):
else all_values else all_values
) )
return our_values.view(StatisticsData) return our_values
class BstHdf5Writer(HDF5Writer): class BstHdf5Writer(HDF5Writer):
...@@ -600,7 +599,7 @@ class BstHdf5Writer(HDF5Writer): ...@@ -600,7 +599,7 @@ class BstHdf5Writer(HDF5Writer):
def new_collector(self): def new_collector(self):
return BSTCollector() return BSTCollector()
def get_matrix(self) -> StatisticsData: def get_matrix(self) -> numpy.ndarray:
matrix = super().get_matrix() matrix = super().get_matrix()
if self.digitalbeam_device: if self.digitalbeam_device:
...@@ -619,12 +618,8 @@ class BstHdf5Writer(HDF5Writer): ...@@ -619,12 +618,8 @@ class BstHdf5Writer(HDF5Writer):
return matrix return matrix
def get_matrix_data(self) -> StatisticsData: def get_matrix_data(self) -> numpy.ndarray:
return ( return self.current_collector.parameters["bst_values"].astype(numpy.float32)
self.current_collector.parameters["bst_values"]
.astype(numpy.float32)
.view(StatisticsData)
)
class XstHdf5Writer(HDF5Writer): class XstHdf5Writer(HDF5Writer):
...@@ -663,12 +658,12 @@ class XstHdf5Writer(HDF5Writer): ...@@ -663,12 +658,12 @@ class XstHdf5Writer(HDF5Writer):
f"{suffix}" f"{suffix}"
) )
def get_matrix(self) -> StatisticsData: def get_matrix(self) -> numpy.ndarray:
matrix = super().get_matrix() matrix = super().get_matrix()
matrix.subbands = numpy.array([self.subband_index]) matrix.subbands = numpy.array([self.subband_index])
return matrix return matrix
def get_matrix_data(self) -> StatisticsData: def get_matrix_data(self) -> numpy.ndarray:
# requires a function call to transform the xst_blocks in to the right # requires a function call to transform the xst_blocks in to the right
# structure # structure
# #
...@@ -685,7 +680,7 @@ class XstHdf5Writer(HDF5Writer): ...@@ -685,7 +680,7 @@ class XstHdf5Writer(HDF5Writer):
else all_values else all_values
) )
return our_values.view(StatisticsData) return our_values
class ParallelXstHdf5Writer: class ParallelXstHdf5Writer:
......
# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
import sys
from os.path import dirname, isfile
from tempfile import TemporaryDirectory
from typing import Tuple
from unittest import mock
from unittest.mock import patch
import h5py
import numpy
from lofar_station_client import __version__ as lsc_version
from lofar_station_client.dts.constants import N_pol
from lofar_station_client.statistics.reader import entry as reader_entry
from lofar_station_client.statistics.statistics_data import (
StatisticsData,
StatisticsFileHeader,
)
from lofar_station_client.statistics.writer import (
entry,
hdf5,
__version__ as writer_version,
)
from tests import base
from tests.test_devices import (
FakeAntennaFieldDeviceProxy,
FakeOffAntennaFieldDeviceProxy,
FakeDigitalBeamDeviceProxy,
FakeTileBeamDeviceProxy,
FakeSDPDeviceProxy,
FakeSDPFirmwareDeviceProxy,
FakeStationManagerDeviceProxy,
)
class TestStatisticsReaderWriter(base.TestCase):
"""Parent TestStatistics class which exposes common internal methods"""
def _mock_get_tango_device(self, tango_disabled, host, device_name):
"""Return our mocked DeviceProxies"""
if device_name == "STAT/AFH/HBA":
return FakeAntennaFieldDeviceProxy(device_name)
if device_name == "STAT/DigitalBeam/HBA":
return FakeDigitalBeamDeviceProxy(device_name)
if device_name == "STAT/TileBeam/HBA":
return FakeTileBeamDeviceProxy(device_name)
if device_name == "STAT/StationManager/1":
return FakeStationManagerDeviceProxy
if device_name == "STAT/SDP/HBA":
return FakeSDPDeviceProxy(device_name)
if device_name == "STAT/SDPFirmware/HBA":
return FakeSDPFirmwareDeviceProxy(device_name)
raise ValueError(
f"Device not mocked, and thus not available in this test: {device_name}"
)
def _mock_get_tango_device_off(self, tango_disabled, host, device_name):
"""Return our mocked DeviceProxies that simulate a device that is off"""
if device_name == "STAT/AFH/HBA":
return FakeOffAntennaFieldDeviceProxy(device_name)
if device_name == "STAT/DigitalBeam/HBA":
return FakeDigitalBeamDeviceProxy(device_name)
if device_name == "STAT/TileBeam/HBA":
return FakeTileBeamDeviceProxy(device_name)
if device_name == "STAT/StationManager/1":
return FakeStationManagerDeviceProxy
if device_name == "STAT/SDP/HBA":
return FakeSDPDeviceProxy(device_name)
if device_name == "STAT/SDPFirmware/HBA":
return FakeSDPFirmwareDeviceProxy(device_name)
raise ValueError(
f"Device not mocked, and thus not available in this test: {device_name}"
)
class TestStatisticsWriterVersion(TestStatisticsReaderWriter):
"""TestStatistics class for print testing"""
@patch("builtins.print") # Mock the print function
def test_version(self, mock_print):
"""Print the stats-writer and lofar-station-client version"""
new_sys_argv = [sys.argv[0], "--version"]
with mock.patch.object(entry.sys, "argv", new_sys_argv):
entry.main()
mock_print.assert_called_with(
f"LOFAR Statistics Writer version: {writer_version} - "
f"LOFAR Station Client version: {lsc_version}"
)
class TestStatisticsReaderWriterSST(TestStatisticsReaderWriter):
"""TestStatistics class for SST-mode"""
def _run_writer_reader(
self, tmpdir: str, writer_argv: list, antennafield: str = "HBA"
) -> Tuple[StatisticsData, StatisticsFileHeader]:
"""Run the statistics writer with the given arguments,
and read and return the output."""
# default arguments for statistics writer
default_writer_sys_argv = [
sys.argv[0],
"--mode",
"SST",
"--file",
dirname(__file__) + "/SDP_SST_statistics_packets.bin",
"--output_dir",
tmpdir,
]
if antennafield != "unknown":
default_writer_sys_argv += ["--antennafield", antennafield]
with mock.patch.object(
entry.sys, "argv", default_writer_sys_argv + writer_argv
):
entry.main()
# check if file was written
self.assertTrue(isfile(f"{tmpdir}/SST_2021-09-20-12-17-40_{antennafield}.h5"))
# default arguments for statistics reader
default_reader_sys_argv = [
sys.argv[0],
"--files",
f"{tmpdir}/SST_2021-09-20-12-17-40_{antennafield}.h5",
"--start_time",
"2021-09-20#07:40:08.937",
"--end_time",
"2021-10-04#07:50:08.937",
]
# test statistics reader
with mock.patch.object(reader_entry.sys, "argv", default_reader_sys_argv):
stat_parser = reader_entry.setup_stat_parser()
sst_statistics = stat_parser.list_statistics()
self.assertIsNotNone(sst_statistics)
stat = stat_parser.get_statistic(
"2021-09-20T12:17:40.000000"
) # same as stat_parser.statistics[0]
file_header = stat_parser.file_header
self.assertIsNotNone(stat)
return stat, file_header
def test_header_info(self):
"""Test whether the header info are inserted and collected in the proper way"""
with mock.patch.object(entry, "_get_tango_device", self._mock_get_tango_device):
with TemporaryDirectory() as tmpdir:
_, file_header = self._run_writer_reader(tmpdir, [])
self.assertIsNotNone(file_header.station_version)
self.assertIsNotNone(file_header.writer_version)
self.assertEqual("SST", file_header.mode)
def test_insert_tango_SST_statistics(self):
with TemporaryDirectory() as tmpdir:
writer_argv = []
with mock.patch.object(
entry, "_get_tango_device", self._mock_get_tango_device
):
stat, file_header = self._run_writer_reader(tmpdir, writer_argv)
self.assertEqual(121, stat.data_id_signal_input_index)
# Test some AntennField attributes, whether they match our mock
self.assertEqual("HBA", file_header.antenna_type)
self.assertListEqual(
["OK", "OK", "OK"], file_header.antenna_quality.tolist()
)
def test_no_tango_SST_statistics(self):
with TemporaryDirectory() as tmpdir:
writer_argv = [
"--no-tango",
]
_ = self._run_writer_reader(tmpdir, writer_argv, "unknown")
def test_SST_statistics_with_device_in_off(self):
with TemporaryDirectory() as tmpdir:
writer_argv = []
with mock.patch.object(
entry, "_get_tango_device", self._mock_get_tango_device_off
):
_ = self._run_writer_reader(tmpdir, writer_argv)
def test_station_name(self):
with TemporaryDirectory() as tmpdir:
writer_argv = []
with mock.patch.object(
entry, "_get_tango_device", self._mock_get_tango_device
):
_, file_header = self._run_writer_reader(tmpdir, writer_argv)
self.assertEqual("DevStation", file_header.station_name)
class TestStatisticsReaderWriterBST(TestStatisticsReaderWriter):
"""TestStatistics class for BST-mode"""
def _run_writer_reader(
self, tmpdir: str, writer_argv: list
) -> Tuple[StatisticsData, StatisticsFileHeader]:
"""Run the statistics writer with the given arguments,
and read and return the output."""
# default arguments for statistics writer
default_writer_sys_argv = [
sys.argv[0],
"--mode",
"BST",
"--antennafield",
"HBA",
"--file",
dirname(__file__) + "/SDP_BST_statistics_packets.bin",
"--output_dir",
tmpdir,
]
with mock.patch.object(
entry.sys, "argv", default_writer_sys_argv + writer_argv
):
entry.main()
# check if file was written
self.assertTrue(isfile(f"{tmpdir}/BST_2022-05-20-11-08-44_HBA.h5"))
# default arguments for statistics reader
default_reader_sys_argv = [
sys.argv[0],
"--files",
f"{tmpdir}/BST_2022-05-20-11-08-44_HBA.h5",
"--start_time",
"2021-09-20#07:40:08.937",
"--end_time",
"2021-10-04#07:50:08.937",
]
# test statistics reader
with mock.patch.object(reader_entry.sys, "argv", default_reader_sys_argv):
stat_parser = reader_entry.setup_stat_parser()
bst_statistics = stat_parser.list_statistics()
self.assertIsNotNone(bst_statistics)
file_header = stat_parser.file_header
return file_header
def test_insert_tango_BST_statistics(self):
with TemporaryDirectory() as tmpdir:
writer_argv = []
with mock.patch.object(
entry, "_get_tango_device", self._mock_get_tango_device
):
_ = self._run_writer_reader(tmpdir, writer_argv)
# validate HDF5 content
with h5py.File(f"{tmpdir}/BST_2022-05-20-11-08-44_HBA.h5") as f:
# validate header
self.assertIn("station_version", dict(f.attrs))
self.assertIn("writer_version", dict(f.attrs))
self.assertIn("mode", dict(f.attrs))
# check for the datasets present in our input data
self.assertIn("BST_2022-05-20T11:08:45.000", dict(f.items()))
def test_bst(self):
with mock.patch.object(entry, "_get_tango_device", self._mock_get_tango_device):
with TemporaryDirectory() as tmpdir:
new_sys_argv = [
sys.argv[0],
"--mode",
"BST",
"--antennafield",
"HBA",
"--file",
dirname(__file__) + "/SDP_BST_statistics_packets.bin",
"--output_dir",
tmpdir,
]
with mock.patch.object(entry.sys, "argv", new_sys_argv):
entry.main()
# check if file was written
self.assertTrue(isfile(f"{tmpdir}/BST_2022-05-20-11-08-44_HBA.h5"))
class TestStatisticsWriterXST(TestStatisticsReaderWriter):
"""TestStatistics class for XST-mode"""
def test_xst(self):
with mock.patch.object(entry, "_get_tango_device", self._mock_get_tango_device):
with TemporaryDirectory() as tmpdir:
new_sys_argv = [
sys.argv[0],
"--mode",
"XST",
"--antennafield",
"HBA",
"--file",
dirname(__file__) + "/SDP_XST_statistics_packets.bin",
"--output_dir",
tmpdir,
]
with mock.patch.object(entry.sys, "argv", new_sys_argv):
entry.main()
# check if file was written
self.assertTrue(
isfile(f"{tmpdir}/XST_2021-09-13-13-21-32_HBA_SB102.h5")
)
# validate HDF5 content
with h5py.File(f"{tmpdir}/XST_2021-09-13-13-21-32_HBA_SB102.h5") as f:
# validate header
self.assertIn("station_version", dict(f.attrs))
self.assertIn("writer_version", dict(f.attrs))
self.assertIn("mode", dict(f.attrs))
# check for the datasets present in our input data
self.assertIn("XST_2021-09-13T13:21:32.000", dict(f.items()))
self.assertIn("XST_2021-09-13T13:21:33.000", dict(f.items()))
self.assertIn("XST_2021-09-13T13:21:34.000", dict(f.items()))
self.assertIn("XST_2021-09-13T13:21:35.000", dict(f.items()))
self.assertIn("XST_2021-09-13T13:21:36.000", dict(f.items()))
self.assertIn("XST_2021-09-13T13:21:37.000", dict(f.items()))
self.assertIn("XST_2021-09-13T13:21:38.000", dict(f.items()))
# check dataset dimensions, should match the actual
# number of antennas, and cover both polarisations.
nr_antennas = FakeAntennaFieldDeviceProxy.nr_antennas_R
self.assertEqual(
(nr_antennas * N_pol, nr_antennas * N_pol),
f["XST_2021-09-13T13:21:32.000"].shape,
)
# check dataset header
self.assertIn(
"timestamp", dict(f["XST_2021-09-13T13:21:32.000"].attrs)
)
# check compression
self.assertEqual(
f["XST_2021-09-13T13:21:32.000"].compression, "gzip"
)
def test_xst_multiple_subbands(self):
with mock.patch.object(entry, "_get_tango_device", self._mock_get_tango_device):
with TemporaryDirectory() as tmpdir:
new_sys_argv = [
sys.argv[0],
"--mode",
"XST",
"--antennafield",
"HBA",
"--file",
dirname(__file__)
+ "/SDP_XST_statistics_packets_multiple_subbands.bin",
"--output_dir",
tmpdir,
]
with mock.patch.object(entry.sys, "argv", new_sys_argv):
entry.main()
# check if files were written
self.assertTrue(
isfile(f"{tmpdir}/XST_2021-09-13-13-21-32_HBA_SB102.h5")
)
self.assertTrue(
isfile(f"{tmpdir}/XST_2021-09-13-13-21-32_HBA_SB103.h5")
)
def test_xst_with_antennafield(self):
with TemporaryDirectory() as tmpdir:
with mock.patch.object(
entry, "_get_tango_device", self._mock_get_tango_device
):
new_sys_argv = [
sys.argv[0],
"--mode",
"XST",
"--antennafield",
"HBA",
"--file",
dirname(__file__) + "/SDP_XST_statistics_packets.bin",
"--output_dir",
tmpdir,
]
with mock.patch.object(entry.sys, "argv", new_sys_argv):
entry.main()
# check if file was written
self.assertTrue(
isfile(f"{tmpdir}/XST_2021-09-13-13-21-32_HBA_SB102.h5")
)
# validate HDF5 content
with h5py.File(f"{tmpdir}/XST_2021-09-13-13-21-32_HBA_SB102.h5") as f:
# check extra header fields provided by the AntennaField
self.assertIn("antenna_names", dict(f.attrs))
self.assertIn("antenna_quality", dict(f.attrs))
self.assertIn("antenna_type", dict(f.attrs))
self.assertIn("antennafield_device", dict(f.attrs))
self.assertIn("rcu_pcb_id", dict(f.attrs))
self.assertIn("rcu_pcb_version", dict(f.attrs))
# check dataset dimensions, should match the number of antennas,
# and cover both polarisations.
self.assertEqual((6, 6), f["XST_2021-09-13T13:21:32.000"].shape)
class TestDictToHdf5Attrs(base.TestCase):
def test_empty_dict(self):
self.assertEqual({}, hdf5._dict_to_hdf5_attrs({}))
def test_int(self):
self.assertDictEqual({"a": 1}, hdf5._dict_to_hdf5_attrs({"a": 1}))
def test_str(self):
self.assertDictEqual({"a": "b"}, hdf5._dict_to_hdf5_attrs({"a": "b"}))
def test_none(self):
self.assertTrue(
h5py.Empty, hdf5._dict_to_hdf5_attrs({"a": None})["a"].__class__
)
def test_nested_dict(self):
self.assertDictEqual({"a_b": 1}, hdf5._dict_to_hdf5_attrs({"a": {"b": 1}}))
class TestSelect1D(base.TestCase):
def setUp(self):
self.in_data = numpy.array(list(range(128)))
def test_select1d_empty_list(self):
self.assertListEqual([], hdf5._select1d(self.in_data, []).tolist())
def test_select1d_select(self):
self.assertListEqual([2, 4], hdf5._select1d(self.in_data, [2, 4]).tolist())
def test_select1d_select_with_none(self):
self.assertListEqual(
[2, -1], hdf5._select1d(self.in_data, [2, None], none_value=-1).tolist()
)
class TestSelect2D(base.TestCase):
def setUp(self):
# abuse complex numbers to create a 2D matrix with unique values
self.in_data = numpy.array([[x + y * 1j for x in range(16)] for y in range(8)])
def test_select2d_empty_list(self):
self.assertListEqual([[]], hdf5._select2d(self.in_data, []).tolist())
def test_select2d_select(self):
self.assertListEqual(
[[2 + 2j, 4 + 2j], [2 + 4j, 4 + 4j]],
hdf5._select2d(self.in_data, [2, 4]).tolist(),
)
def test_select2d_select_with_none(self):
self.assertListEqual(
[[2 + 2j, -1], [-1, -1]],
hdf5._select2d(self.in_data, [2, None], none_value=-1).tolist(),
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment