diff --git a/README.md b/README.md index 8c65ffaca28f55c4e3c837c7a7253979a1a3b64a..47dcd46e7f2a90c78ddda1ca314f5bf5ce906efb 100644 --- a/README.md +++ b/README.md @@ -122,6 +122,7 @@ tox -e debug tests.requests.test_prometheus ## Release notes +- 0.19.1 - Fix handling of closing streams on HDF5 files - 0.19.0 - Compatibility with tangostationcontrol 0.35.0: query Antenna_Status_R, which replaced Antenna_Quality_str_R in tangostationcontrol 0.35.0. - 0.18.8 - Migrate case insensitive dict from station control diff --git a/VERSION b/VERSION index 1cf0537c34385c81dd797cc51be2a589e17118e9..41915c799477856bc769e838128f06905e641650 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.19.0 +0.19.1 diff --git a/lofar_station_client/file_access/hdf/_hdf_readers.py b/lofar_station_client/file_access/hdf/_hdf_readers.py index bdbbe25ab2daa6cc78a774bff4b7826bad39a995..62d71fc5354141534dbe0a7f9c641c8a35682a93 100644 --- a/lofar_station_client/file_access/hdf/_hdf_readers.py +++ b/lofar_station_client/file_access/hdf/_hdf_readers.py @@ -64,6 +64,7 @@ class HdfFileReader(FileReader[T]): if not self._is_closed: self._is_closed = True + self._hdf5_file.close() del self._hdf5_file def load(self, instance: T): @@ -194,7 +195,7 @@ class HdfDataReader(DataReader): return lambda value: cls._read_object(target_type, value, data_reader) -def read_hdf5(name: str, target_type: Type[T]) -> FileReader[T]: +def read_hdf5(name, target_type: Type[T]) -> FileReader[T]: """ Open a HDF5 file by name/path """ diff --git a/lofar_station_client/file_access/hdf/_hdf_writers.py b/lofar_station_client/file_access/hdf/_hdf_writers.py index 49c871cf1bbbf877311e4d544e9b059cc015ef5c..a28b1a5053b751dae872128986739e90164f79dc 100644 --- a/lofar_station_client/file_access/hdf/_hdf_writers.py +++ b/lofar_station_client/file_access/hdf/_hdf_writers.py @@ -280,14 +280,14 @@ class HdfDataWriter(HdfDataReader, DataWriter): _attach_object(target_type, value) -def open_hdf5(name: str, target_type: Type[T]) -> FileWriter[T]: +def open_hdf5(name, target_type: Type[T]) -> FileWriter[T]: """ Open a HDF5 file by name/path """ return HdfFileWriter[T](name, target_type, False) -def create_hdf5(name: str, target_type: Type[T]) -> FileWriter[T]: +def create_hdf5(name, target_type: Type[T]) -> FileWriter[T]: """ Create a HDF5 file by name/path """ diff --git a/lofar_station_client/statistics/reader/entry.py b/lofar_station_client/statistics/reader/entry.py index 2ecc10e5649b21413aaedb06dd84bee7ef479e4f..f819d48897a4fd1d8083fcb1d45f349d092b4aea 100644 --- a/lofar_station_client/statistics/reader/entry.py +++ b/lofar_station_client/statistics/reader/entry.py @@ -1,4 +1,4 @@ -# Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy) +# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy) # SPDX-License-Identifier: Apache-2.0 @@ -108,6 +108,7 @@ class StatisticsParser: logger.debug("Parsing hdf5 statistics file") with hdf5_file as statistic_data: + hdf5_file.load(statistic_data) if self.file_header is None: self.file_header = statistic_data elif not StatisticsFileHeader.__eq__(self.file_header, statistic_data): diff --git a/lofar_station_client/statistics/statistics_data.py b/lofar_station_client/statistics/statistics_data.py index 40fbedbb9de504818acb4d8ece24515a6f9f8ed0..e483b2f14a64484917e2801f8d8db7d0066bed28 100644 --- a/lofar_station_client/statistics/statistics_data.py +++ b/lofar_station_client/statistics/statistics_data.py @@ -8,170 +8,13 @@ Models the structure of an HDF statistics file. """ import inspect -from typing import Dict, List +from typing import Dict from numpy import ndarray from lofar_station_client.file_access import attribute -class StatisticsData(ndarray): - """ - Representation of a periodic data matrix written to the HDF statistics file. - """ - - version_id: int = attribute() - """ Version of the packet format. """ - - timestamp: str = attribute() - """ timestamp of the data """ - - station_info_raw: int = attribute(name="station_info__raw", optional=True) - """ Bit field with station information, encoding several other properties. """ - - station_info_station_id: int = attribute() - """ Station identifier """ - - station_info_antenna_field_index: int = attribute() - """ Antenna field number """ - - source_info_t_adc: int = attribute() - """" Sampling clock. 0 = 160 MHz, 1 = 200 MHz. """ - - source_info_subband_calibrated_flag: bool = attribute() - """ subband_calibrated_flag 1 = subband data had subband calibration values applied, - 0 = not. - """ - - source_info_payload_error: bool = attribute() - """ 0 = data is ok, 1 = data is corrupted (a fault was encountered). """ - - source_info_nyquist_zone_index: int = attribute() - """ nyquist zone of filter - - - 0 = 0 -- 1/2 * t_adc Hz (low band), - - 1 = 1/2 * t_adc -- t_adc Hz (high band), - - 2 = t_adc -- 3/2 * t_adc Hz (high band). - """ - - source_info_gn_index: int = attribute() - """ Global index of FPGA that emitted this packet. """ - - source_info_fsub_type: int = attribute() - """ Sampling method. 0 = critically sampled, 1 = oversampled. """ - - source_info_beam_repositioning_flag: bool = attribute() - - source_info_antenna_band_index: int = attribute() - """ Antenna type. 0 = low band, 1 = high band. """ - - source_info_raw: int = attribute(name="source_info__raw", optional=True) - """ Bit field with input information, encoding several other properties. """ - - observation_id: int = attribute() - """ Observation identifier """ - - nof_statistics_per_packet: int = attribute() - """ Number of statistic data points in the payload. """ - - nof_signal_inputs: int = attribute() - """ Number of inputs that contributed to data in this packet. """ - - nof_bytes_per_statistic: int = attribute() - """ Word size of each statistic """ - - marker: str = attribute() - """ The type of statistic: - - - 'S' = SST - - 'B' = BST - - 'X' = XST - - 'b' = beamlet - """ - - integration_interval: float = attribute() - """ Integration interval, in seconds. """ - - integration_interval_raw: int = attribute() - """ Integration interval, in block periods. """ - - data_id_raw: int = attribute(name="data_id__raw", optional=True) - """ Bit field with payload information, encoding several other properties. """ - - block_serial_number: int = attribute() - """ Timestamp of the data, in block periods since 1970. """ - - block_period_raw: int = attribute() - """ Block period, in nanoseconds. """ - - block_period: float = attribute() - """ Block period, in seconds. """ - - data_id_signal_input_index: int = attribute() - """ - SST input (antenna polarisation) index for which this packet contains statistics - """ - - data_id_subband_index: int = attribute(optional=True) - """ - XST subband number for which this packet contains statistics. - """ - - data_id_first_baseline: int = attribute(optional=True) - """ - XST first antenna pair for which this packet contains statistics. - """ - - data_id_beamlet_index: int = attribute(optional=True) - """ - BST the number of the beamlet for which this packet holds statistics. - """ - - nof_valid_payloads: ndarray = attribute() - """ Number of packets received so far that we could parse correctly and do not have - a payload error. One value per FPGA. """ - - nof_payload_errors: ndarray = attribute() - """ Number of packets that reported a payload error. One value per FPGA. """ - - gn_indices: ndarray = attribute() - """ Global node index (gn) as reported by each FPGA, for each element in the payload - counters. """ - - tile_beam_pointing_direction: List[str] = attribute(optional=True) - """ Direction of the tile beam """ - - tile_beam_tracking_enabled: List[bool] = attribute(optional=True) - """ Whether the tile beam is tracking """ - - digital_beam_pointing_direction: List[str] = attribute(optional=True) - """ Direction of the digital beam """ - - digital_beam_tracking_enabled: List[bool] = attribute(optional=True) - """ Whether the digital beam is tracking """ - - hbat_pwr_on: ndarray = attribute(optional=True) - """ Elements per hba tile """ - - rcu_attenuator_db: List[float] = attribute(name="rcu_attenuator_dB", optional=True) - """ Amount of dB with which each antenna signal must be adjusted to line up. """ - - rcu_dth_on: List[bool] = attribute(optional=True) - rcu_dth_freq: List[float] = attribute(optional=True) - - clock: int = attribute(optional=True) - """ clock in hz """ - - frequency_band: ndarray = attribute(optional=True) - """ filter selection """ - - sdp_subband_frequency_range: ndarray = attribute(optional=True) - """ subband frequencies of subbands 0 and 511, as assumed by SDP """ - - subbands: List[int] = attribute(optional=True) - """ Subband number for each beamlet """ - - class StatisticsFileHeader: """ Pythonic representation of the HDF statistics file header written by the statistics @@ -228,7 +71,7 @@ class StatisticsFileHeader: return True -class StatisticsDataFile(Dict[str, StatisticsData], StatisticsFileHeader): +class StatisticsDataFile(Dict[str, ndarray], StatisticsFileHeader): """ Pythonic representation of the HDF statistics file written by the statistics writer. diff --git a/lofar_station_client/statistics/writer/hdf5.py b/lofar_station_client/statistics/writer/hdf5.py index 90e9a5835689ba4da7ca9663f8d804ea2caaa411..6114a1652ca12ca497e87c7f51f8266d6651e9f4 100644 --- a/lofar_station_client/statistics/writer/hdf5.py +++ b/lofar_station_client/statistics/writer/hdf5.py @@ -1,4 +1,4 @@ -# Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy) +# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy) # SPDX-License-Identifier: Apache-2.0 """Hdf5 packets file writer""" @@ -29,7 +29,6 @@ from lofar_station_client.statistics.collectors._xst import XSTCollector from lofar_station_client.statistics.packets import StatisticsPacket from lofar_station_client.statistics.statistics_data import ( StatisticsDataFile, - StatisticsData, ) try: @@ -378,7 +377,7 @@ class HDF5Writer(ABC): matrix_name, ) - def _add_device_metadata(self, matrix: StatisticsData): + def _add_device_metadata(self, matrix: numpy.ndarray): # add station state if self.antennafield_device: try: @@ -464,7 +463,7 @@ class HDF5Writer(ABC): return matrix @abstractmethod - def get_matrix_data(self) -> StatisticsData: + def get_matrix_data(self) -> numpy.ndarray: """Abstract method""" def next_filename(self, timestamp, suffix=".h5"): @@ -559,12 +558,12 @@ class SstHdf5Writer(HDF5Writer): def new_collector(self): return SSTCollector(self.nr_signal_inputs, self.first_signal_input_index) - def get_matrix(self) -> StatisticsData: + def get_matrix(self) -> numpy.ndarray: matrix = super().get_matrix() matrix.subbands = numpy.array(range(512)) return matrix - def get_matrix_data(self) -> StatisticsData: + def get_matrix_data(self) -> numpy.ndarray: # first obtain all values from SDP all_values = self.current_collector.parameters["sst_values"].astype( numpy.float32 @@ -576,7 +575,7 @@ class SstHdf5Writer(HDF5Writer): else all_values ) - return our_values.view(StatisticsData) + return our_values class BstHdf5Writer(HDF5Writer): @@ -600,7 +599,7 @@ class BstHdf5Writer(HDF5Writer): def new_collector(self): return BSTCollector() - def get_matrix(self) -> StatisticsData: + def get_matrix(self) -> numpy.ndarray: matrix = super().get_matrix() if self.digitalbeam_device: @@ -619,12 +618,8 @@ class BstHdf5Writer(HDF5Writer): return matrix - def get_matrix_data(self) -> StatisticsData: - return ( - self.current_collector.parameters["bst_values"] - .astype(numpy.float32) - .view(StatisticsData) - ) + def get_matrix_data(self) -> numpy.ndarray: + return self.current_collector.parameters["bst_values"].astype(numpy.float32) class XstHdf5Writer(HDF5Writer): @@ -663,12 +658,12 @@ class XstHdf5Writer(HDF5Writer): f"{suffix}" ) - def get_matrix(self) -> StatisticsData: + def get_matrix(self) -> numpy.ndarray: matrix = super().get_matrix() matrix.subbands = numpy.array([self.subband_index]) return matrix - def get_matrix_data(self) -> StatisticsData: + def get_matrix_data(self) -> numpy.ndarray: # requires a function call to transform the xst_blocks in to the right # structure # @@ -685,7 +680,7 @@ class XstHdf5Writer(HDF5Writer): else all_values ) - return our_values.view(StatisticsData) + return our_values class ParallelXstHdf5Writer: diff --git a/tests/statistics/test_writer.py b/tests/statistics/test_writer.py deleted file mode 100644 index eff047ebe452b5ff77731efd5bfb51086b51751d..0000000000000000000000000000000000000000 --- a/tests/statistics/test_writer.py +++ /dev/null @@ -1,468 +0,0 @@ -# Copyright (C) 2022 ASTRON (Netherlands Institute for Radio Astronomy) -# SPDX-License-Identifier: Apache-2.0 - -from os.path import dirname, isfile -from tempfile import TemporaryDirectory -from unittest import mock -from unittest.mock import patch -from typing import Tuple -import sys -import h5py -import numpy - -from lofar_station_client.dts.constants import N_pol -from lofar_station_client.statistics.statistics_data import ( - StatisticsData, - StatisticsFileHeader, -) -from lofar_station_client.statistics.writer import ( - entry, - hdf5, - __version__ as writer_version, -) -from lofar_station_client import __version__ as lsc_version -from lofar_station_client.statistics.reader import entry as reader_entry -from tests.test_devices import ( - FakeAntennaFieldDeviceProxy, - FakeOffAntennaFieldDeviceProxy, - FakeDigitalBeamDeviceProxy, - FakeTileBeamDeviceProxy, - FakeSDPDeviceProxy, - FakeSDPFirmwareDeviceProxy, - FakeStationManagerDeviceProxy, -) -from tests import base - - -class TestStatisticsReaderWriter(base.TestCase): - """Parent TestStatistics class which exposes common internal methods""" - - def _mock_get_tango_device(self, tango_disabled, host, device_name): - """Return our mocked DeviceProxies""" - if device_name == "STAT/AFH/HBA": - return FakeAntennaFieldDeviceProxy(device_name) - if device_name == "STAT/DigitalBeam/HBA": - return FakeDigitalBeamDeviceProxy(device_name) - if device_name == "STAT/TileBeam/HBA": - return FakeTileBeamDeviceProxy(device_name) - if device_name == "STAT/StationManager/1": - return FakeStationManagerDeviceProxy - if device_name == "STAT/SDP/HBA": - return FakeSDPDeviceProxy(device_name) - if device_name == "STAT/SDPFirmware/HBA": - return FakeSDPFirmwareDeviceProxy(device_name) - raise ValueError( - f"Device not mocked, and thus not available in this test: {device_name}" - ) - - def _mock_get_tango_device_off(self, tango_disabled, host, device_name): - """Return our mocked DeviceProxies that simulate a device that is off""" - if device_name == "STAT/AFH/HBA": - return FakeOffAntennaFieldDeviceProxy(device_name) - if device_name == "STAT/DigitalBeam/HBA": - return FakeDigitalBeamDeviceProxy(device_name) - if device_name == "STAT/TileBeam/HBA": - return FakeTileBeamDeviceProxy(device_name) - if device_name == "STAT/StationManager/1": - return FakeStationManagerDeviceProxy - if device_name == "STAT/SDP/HBA": - return FakeSDPDeviceProxy(device_name) - if device_name == "STAT/SDPFirmware/HBA": - return FakeSDPFirmwareDeviceProxy(device_name) - raise ValueError( - f"Device not mocked, and thus not available in this test: {device_name}" - ) - - -class TestStatisticsWriterVersion(TestStatisticsReaderWriter): - """TestStatistics class for print testing""" - - @patch("builtins.print") # Mock the print function - def test_version(self, mock_print): - """Print the stats-writer and lofar-station-client version""" - new_sys_argv = [sys.argv[0], "--version"] - with mock.patch.object(entry.sys, "argv", new_sys_argv): - entry.main() - mock_print.assert_called_with( - f"LOFAR Statistics Writer version: {writer_version} - " - f"LOFAR Station Client version: {lsc_version}" - ) - - -class TestStatisticsReaderWriterSST(TestStatisticsReaderWriter): - """TestStatistics class for SST-mode""" - - def _run_writer_reader( - self, tmpdir: str, writer_argv: list, antennafield: str = "HBA" - ) -> Tuple[StatisticsData, StatisticsFileHeader]: - """Run the statistics writer with the given arguments, - and read and return the output.""" - # default arguments for statistics writer - default_writer_sys_argv = [ - sys.argv[0], - "--mode", - "SST", - "--file", - dirname(__file__) + "/SDP_SST_statistics_packets.bin", - "--output_dir", - tmpdir, - ] - - if antennafield != "unknown": - default_writer_sys_argv += ["--antennafield", antennafield] - - with mock.patch.object( - entry.sys, "argv", default_writer_sys_argv + writer_argv - ): - entry.main() - - # check if file was written - self.assertTrue(isfile(f"{tmpdir}/SST_2021-09-20-12-17-40_{antennafield}.h5")) - - # default arguments for statistics reader - default_reader_sys_argv = [ - sys.argv[0], - "--files", - f"{tmpdir}/SST_2021-09-20-12-17-40_{antennafield}.h5", - "--start_time", - "2021-09-20#07:40:08.937", - "--end_time", - "2021-10-04#07:50:08.937", - ] - - # test statistics reader - with mock.patch.object(reader_entry.sys, "argv", default_reader_sys_argv): - stat_parser = reader_entry.setup_stat_parser() - sst_statistics = stat_parser.list_statistics() - self.assertIsNotNone(sst_statistics) - stat = stat_parser.get_statistic( - "2021-09-20T12:17:40.000000" - ) # same as stat_parser.statistics[0] - file_header = stat_parser.file_header - self.assertIsNotNone(stat) - - return stat, file_header - - def test_header_info(self): - """Test whether the header info are inserted and collected in the proper way""" - with mock.patch.object(entry, "_get_tango_device", self._mock_get_tango_device): - with TemporaryDirectory() as tmpdir: - _, file_header = self._run_writer_reader(tmpdir, []) - - self.assertIsNotNone(file_header.station_version) - self.assertIsNotNone(file_header.writer_version) - self.assertEqual("SST", file_header.mode) - - def test_insert_tango_SST_statistics(self): - with TemporaryDirectory() as tmpdir: - writer_argv = [] - - with mock.patch.object( - entry, "_get_tango_device", self._mock_get_tango_device - ): - stat, file_header = self._run_writer_reader(tmpdir, writer_argv) - - self.assertEqual(121, stat.data_id_signal_input_index) - - # Test some AntennField attributes, whether they match our mock - self.assertEqual("HBA", file_header.antenna_type) - self.assertListEqual( - ["OK", "OK", "OK"], file_header.antenna_quality.tolist() - ) - - def test_no_tango_SST_statistics(self): - with TemporaryDirectory() as tmpdir: - writer_argv = [ - "--no-tango", - ] - - _ = self._run_writer_reader(tmpdir, writer_argv, "unknown") - - def test_SST_statistics_with_device_in_off(self): - with TemporaryDirectory() as tmpdir: - writer_argv = [] - - with mock.patch.object( - entry, "_get_tango_device", self._mock_get_tango_device_off - ): - _ = self._run_writer_reader(tmpdir, writer_argv) - - def test_station_name(self): - with TemporaryDirectory() as tmpdir: - writer_argv = [] - - with mock.patch.object( - entry, "_get_tango_device", self._mock_get_tango_device - ): - _, file_header = self._run_writer_reader(tmpdir, writer_argv) - - self.assertEqual("DevStation", file_header.station_name) - - -class TestStatisticsReaderWriterBST(TestStatisticsReaderWriter): - """TestStatistics class for BST-mode""" - - def _run_writer_reader( - self, tmpdir: str, writer_argv: list - ) -> Tuple[StatisticsData, StatisticsFileHeader]: - """Run the statistics writer with the given arguments, - and read and return the output.""" - # default arguments for statistics writer - default_writer_sys_argv = [ - sys.argv[0], - "--mode", - "BST", - "--antennafield", - "HBA", - "--file", - dirname(__file__) + "/SDP_BST_statistics_packets.bin", - "--output_dir", - tmpdir, - ] - - with mock.patch.object( - entry.sys, "argv", default_writer_sys_argv + writer_argv - ): - entry.main() - - # check if file was written - self.assertTrue(isfile(f"{tmpdir}/BST_2022-05-20-11-08-44_HBA.h5")) - - # default arguments for statistics reader - default_reader_sys_argv = [ - sys.argv[0], - "--files", - f"{tmpdir}/BST_2022-05-20-11-08-44_HBA.h5", - "--start_time", - "2021-09-20#07:40:08.937", - "--end_time", - "2021-10-04#07:50:08.937", - ] - - # test statistics reader - with mock.patch.object(reader_entry.sys, "argv", default_reader_sys_argv): - stat_parser = reader_entry.setup_stat_parser() - bst_statistics = stat_parser.list_statistics() - self.assertIsNotNone(bst_statistics) - file_header = stat_parser.file_header - - return file_header - - def test_insert_tango_BST_statistics(self): - with TemporaryDirectory() as tmpdir: - writer_argv = [] - - with mock.patch.object( - entry, "_get_tango_device", self._mock_get_tango_device - ): - _ = self._run_writer_reader(tmpdir, writer_argv) - - # validate HDF5 content - with h5py.File(f"{tmpdir}/BST_2022-05-20-11-08-44_HBA.h5") as f: - # validate header - self.assertIn("station_version", dict(f.attrs)) - self.assertIn("writer_version", dict(f.attrs)) - self.assertIn("mode", dict(f.attrs)) - - # check for the datasets present in our input data - self.assertIn("BST_2022-05-20T11:08:45.000", dict(f.items())) - - def test_bst(self): - with mock.patch.object(entry, "_get_tango_device", self._mock_get_tango_device): - with TemporaryDirectory() as tmpdir: - new_sys_argv = [ - sys.argv[0], - "--mode", - "BST", - "--antennafield", - "HBA", - "--file", - dirname(__file__) + "/SDP_BST_statistics_packets.bin", - "--output_dir", - tmpdir, - ] - with mock.patch.object(entry.sys, "argv", new_sys_argv): - entry.main() - - # check if file was written - self.assertTrue(isfile(f"{tmpdir}/BST_2022-05-20-11-08-44_HBA.h5")) - - -class TestStatisticsWriterXST(TestStatisticsReaderWriter): - """TestStatistics class for XST-mode""" - - def test_xst(self): - with mock.patch.object(entry, "_get_tango_device", self._mock_get_tango_device): - with TemporaryDirectory() as tmpdir: - new_sys_argv = [ - sys.argv[0], - "--mode", - "XST", - "--antennafield", - "HBA", - "--file", - dirname(__file__) + "/SDP_XST_statistics_packets.bin", - "--output_dir", - tmpdir, - ] - with mock.patch.object(entry.sys, "argv", new_sys_argv): - entry.main() - - # check if file was written - self.assertTrue( - isfile(f"{tmpdir}/XST_2021-09-13-13-21-32_HBA_SB102.h5") - ) - - # validate HDF5 content - with h5py.File(f"{tmpdir}/XST_2021-09-13-13-21-32_HBA_SB102.h5") as f: - # validate header - self.assertIn("station_version", dict(f.attrs)) - self.assertIn("writer_version", dict(f.attrs)) - self.assertIn("mode", dict(f.attrs)) - - # check for the datasets present in our input data - self.assertIn("XST_2021-09-13T13:21:32.000", dict(f.items())) - self.assertIn("XST_2021-09-13T13:21:33.000", dict(f.items())) - self.assertIn("XST_2021-09-13T13:21:34.000", dict(f.items())) - self.assertIn("XST_2021-09-13T13:21:35.000", dict(f.items())) - self.assertIn("XST_2021-09-13T13:21:36.000", dict(f.items())) - self.assertIn("XST_2021-09-13T13:21:37.000", dict(f.items())) - self.assertIn("XST_2021-09-13T13:21:38.000", dict(f.items())) - - # check dataset dimensions, should match the actual - # number of antennas, and cover both polarisations. - nr_antennas = FakeAntennaFieldDeviceProxy.nr_antennas_R - self.assertEqual( - (nr_antennas * N_pol, nr_antennas * N_pol), - f["XST_2021-09-13T13:21:32.000"].shape, - ) - - # check dataset header - self.assertIn( - "timestamp", dict(f["XST_2021-09-13T13:21:32.000"].attrs) - ) - - # check compression - self.assertEqual( - f["XST_2021-09-13T13:21:32.000"].compression, "gzip" - ) - - def test_xst_multiple_subbands(self): - with mock.patch.object(entry, "_get_tango_device", self._mock_get_tango_device): - with TemporaryDirectory() as tmpdir: - new_sys_argv = [ - sys.argv[0], - "--mode", - "XST", - "--antennafield", - "HBA", - "--file", - dirname(__file__) - + "/SDP_XST_statistics_packets_multiple_subbands.bin", - "--output_dir", - tmpdir, - ] - with mock.patch.object(entry.sys, "argv", new_sys_argv): - entry.main() - - # check if files were written - self.assertTrue( - isfile(f"{tmpdir}/XST_2021-09-13-13-21-32_HBA_SB102.h5") - ) - self.assertTrue( - isfile(f"{tmpdir}/XST_2021-09-13-13-21-32_HBA_SB103.h5") - ) - - def test_xst_with_antennafield(self): - with TemporaryDirectory() as tmpdir: - with mock.patch.object( - entry, "_get_tango_device", self._mock_get_tango_device - ): - new_sys_argv = [ - sys.argv[0], - "--mode", - "XST", - "--antennafield", - "HBA", - "--file", - dirname(__file__) + "/SDP_XST_statistics_packets.bin", - "--output_dir", - tmpdir, - ] - with mock.patch.object(entry.sys, "argv", new_sys_argv): - entry.main() - - # check if file was written - self.assertTrue( - isfile(f"{tmpdir}/XST_2021-09-13-13-21-32_HBA_SB102.h5") - ) - - # validate HDF5 content - with h5py.File(f"{tmpdir}/XST_2021-09-13-13-21-32_HBA_SB102.h5") as f: - # check extra header fields provided by the AntennaField - self.assertIn("antenna_names", dict(f.attrs)) - self.assertIn("antenna_quality", dict(f.attrs)) - self.assertIn("antenna_type", dict(f.attrs)) - self.assertIn("antennafield_device", dict(f.attrs)) - self.assertIn("rcu_pcb_id", dict(f.attrs)) - self.assertIn("rcu_pcb_version", dict(f.attrs)) - - # check dataset dimensions, should match the number of antennas, - # and cover both polarisations. - self.assertEqual((6, 6), f["XST_2021-09-13T13:21:32.000"].shape) - - -class TestDictToHdf5Attrs(base.TestCase): - def test_empty_dict(self): - self.assertEqual({}, hdf5._dict_to_hdf5_attrs({})) - - def test_int(self): - self.assertDictEqual({"a": 1}, hdf5._dict_to_hdf5_attrs({"a": 1})) - - def test_str(self): - self.assertDictEqual({"a": "b"}, hdf5._dict_to_hdf5_attrs({"a": "b"})) - - def test_none(self): - self.assertTrue( - h5py.Empty, hdf5._dict_to_hdf5_attrs({"a": None})["a"].__class__ - ) - - def test_nested_dict(self): - self.assertDictEqual({"a_b": 1}, hdf5._dict_to_hdf5_attrs({"a": {"b": 1}})) - - -class TestSelect1D(base.TestCase): - def setUp(self): - self.in_data = numpy.array(list(range(128))) - - def test_select1d_empty_list(self): - self.assertListEqual([], hdf5._select1d(self.in_data, []).tolist()) - - def test_select1d_select(self): - self.assertListEqual([2, 4], hdf5._select1d(self.in_data, [2, 4]).tolist()) - - def test_select1d_select_with_none(self): - self.assertListEqual( - [2, -1], hdf5._select1d(self.in_data, [2, None], none_value=-1).tolist() - ) - - -class TestSelect2D(base.TestCase): - def setUp(self): - # abuse complex numbers to create a 2D matrix with unique values - self.in_data = numpy.array([[x + y * 1j for x in range(16)] for y in range(8)]) - - def test_select2d_empty_list(self): - self.assertListEqual([[]], hdf5._select2d(self.in_data, []).tolist()) - - def test_select2d_select(self): - self.assertListEqual( - [[2 + 2j, 4 + 2j], [2 + 4j, 4 + 4j]], - hdf5._select2d(self.in_data, [2, 4]).tolist(), - ) - - def test_select2d_select_with_none(self): - self.assertListEqual( - [[2 + 2j, -1], [-1, -1]], - hdf5._select2d(self.in_data, [2, None], none_value=-1).tolist(), - )