From 5593e579dec56a128ce06b5090e8807fc93cacd3 Mon Sep 17 00:00:00 2001
From: Hannes Feldt <feldt@astron.nl>
Date: Wed, 5 Jun 2024 14:36:46 +0200
Subject: [PATCH] cleanup

---
 .../statistics/writer/hdf5.py                 |  29 +-
 tests/statistics/test_writer.py               | 469 ------------------
 2 files changed, 12 insertions(+), 486 deletions(-)
 delete mode 100644 tests/statistics/test_writer.py

diff --git a/lofar_station_client/statistics/writer/hdf5.py b/lofar_station_client/statistics/writer/hdf5.py
index d62a1f0..4989817 100644
--- a/lofar_station_client/statistics/writer/hdf5.py
+++ b/lofar_station_client/statistics/writer/hdf5.py
@@ -1,4 +1,4 @@
-#  Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy)
+#  Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
 #  SPDX-License-Identifier: Apache-2.0
 
 """Hdf5 packets file writer"""
@@ -29,7 +29,6 @@ from lofar_station_client.statistics.collectors._xst import XSTCollector
 from lofar_station_client.statistics.packets import StatisticsPacket
 from lofar_station_client.statistics.statistics_data import (
     StatisticsDataFile,
-    StatisticsData,
 )
 
 try:
@@ -378,7 +377,7 @@ class HDF5Writer(ABC):
                 matrix_name,
             )
 
-    def _add_device_metadata(self, matrix: StatisticsData):
+    def _add_device_metadata(self, matrix: numpy.ndarray):
         # add station state
         if self.antennafield_device:
             try:
@@ -464,7 +463,7 @@ class HDF5Writer(ABC):
         return matrix
 
     @abstractmethod
-    def get_matrix_data(self) -> StatisticsData:
+    def get_matrix_data(self) -> numpy.ndarray:
         """Abstract method"""
 
     def next_filename(self, timestamp, suffix=".h5"):
@@ -559,12 +558,12 @@ class SstHdf5Writer(HDF5Writer):
     def new_collector(self):
         return SSTCollector(self.nr_signal_inputs, self.first_signal_input_index)
 
-    def get_matrix(self) -> StatisticsData:
+    def get_matrix(self) -> numpy.ndarray:
         matrix = super().get_matrix()
         matrix.subbands = numpy.array(range(512))
         return matrix
 
-    def get_matrix_data(self) -> StatisticsData:
+    def get_matrix_data(self) -> numpy.ndarray:
         # first obtain all values from SDP
         all_values = self.current_collector.parameters["sst_values"].astype(
             numpy.float32
@@ -576,7 +575,7 @@ class SstHdf5Writer(HDF5Writer):
             else all_values
         )
 
-        return our_values.view(StatisticsData)
+        return our_values
 
 
 class BstHdf5Writer(HDF5Writer):
@@ -600,7 +599,7 @@ class BstHdf5Writer(HDF5Writer):
     def new_collector(self):
         return BSTCollector()
 
-    def get_matrix(self) -> StatisticsData:
+    def get_matrix(self) -> numpy.ndarray:
         matrix = super().get_matrix()
 
         if self.digitalbeam_device:
@@ -619,12 +618,8 @@ class BstHdf5Writer(HDF5Writer):
 
         return matrix
 
-    def get_matrix_data(self) -> StatisticsData:
-        return (
-            self.current_collector.parameters["bst_values"]
-            .astype(numpy.float32)
-            .view(StatisticsData)
-        )
+    def get_matrix_data(self) -> numpy.ndarray:
+        return self.current_collector.parameters["bst_values"].astype(numpy.float32)
 
 
 class XstHdf5Writer(HDF5Writer):
@@ -663,12 +658,12 @@ class XstHdf5Writer(HDF5Writer):
             f"{suffix}"
         )
 
-    def get_matrix(self) -> StatisticsData:
+    def get_matrix(self) -> numpy.ndarray:
         matrix = super().get_matrix()
         matrix.subbands = numpy.array([self.subband_index])
         return matrix
 
-    def get_matrix_data(self) -> StatisticsData:
+    def get_matrix_data(self) -> numpy.ndarray:
         # requires a function call to transform the xst_blocks in to the right
         # structure
         #
@@ -685,7 +680,7 @@ class XstHdf5Writer(HDF5Writer):
             else all_values
         )
 
-        return our_values.view(StatisticsData)
+        return our_values
 
 
 class ParallelXstHdf5Writer:
diff --git a/tests/statistics/test_writer.py b/tests/statistics/test_writer.py
deleted file mode 100644
index 60d1d2d..0000000
--- a/tests/statistics/test_writer.py
+++ /dev/null
@@ -1,469 +0,0 @@
-#  Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
-#  SPDX-License-Identifier: Apache-2.0
-
-import sys
-from os.path import dirname, isfile
-from tempfile import TemporaryDirectory
-from typing import Tuple
-from unittest import mock
-from unittest.mock import patch
-
-import h5py
-import numpy
-
-from lofar_station_client import __version__ as lsc_version
-from lofar_station_client.dts.constants import N_pol
-from lofar_station_client.statistics.reader import entry as reader_entry
-from lofar_station_client.statistics.statistics_data import (
-    StatisticsData,
-    StatisticsFileHeader,
-)
-from lofar_station_client.statistics.writer import (
-    entry,
-    hdf5,
-    __version__ as writer_version,
-)
-from tests import base
-from tests.test_devices import (
-    FakeAntennaFieldDeviceProxy,
-    FakeOffAntennaFieldDeviceProxy,
-    FakeDigitalBeamDeviceProxy,
-    FakeTileBeamDeviceProxy,
-    FakeSDPDeviceProxy,
-    FakeSDPFirmwareDeviceProxy,
-    FakeStationManagerDeviceProxy,
-)
-
-
-class TestStatisticsReaderWriter(base.TestCase):
-    """Parent TestStatistics class which exposes common internal methods"""
-
-    def _mock_get_tango_device(self, tango_disabled, host, device_name):
-        """Return our mocked DeviceProxies"""
-        if device_name == "STAT/AFH/HBA":
-            return FakeAntennaFieldDeviceProxy(device_name)
-        if device_name == "STAT/DigitalBeam/HBA":
-            return FakeDigitalBeamDeviceProxy(device_name)
-        if device_name == "STAT/TileBeam/HBA":
-            return FakeTileBeamDeviceProxy(device_name)
-        if device_name == "STAT/StationManager/1":
-            return FakeStationManagerDeviceProxy
-        if device_name == "STAT/SDP/HBA":
-            return FakeSDPDeviceProxy(device_name)
-        if device_name == "STAT/SDPFirmware/HBA":
-            return FakeSDPFirmwareDeviceProxy(device_name)
-        raise ValueError(
-            f"Device not mocked, and thus not available in this test: {device_name}"
-        )
-
-    def _mock_get_tango_device_off(self, tango_disabled, host, device_name):
-        """Return our mocked DeviceProxies that simulate a device that is off"""
-        if device_name == "STAT/AFH/HBA":
-            return FakeOffAntennaFieldDeviceProxy(device_name)
-        if device_name == "STAT/DigitalBeam/HBA":
-            return FakeDigitalBeamDeviceProxy(device_name)
-        if device_name == "STAT/TileBeam/HBA":
-            return FakeTileBeamDeviceProxy(device_name)
-        if device_name == "STAT/StationManager/1":
-            return FakeStationManagerDeviceProxy
-        if device_name == "STAT/SDP/HBA":
-            return FakeSDPDeviceProxy(device_name)
-        if device_name == "STAT/SDPFirmware/HBA":
-            return FakeSDPFirmwareDeviceProxy(device_name)
-        raise ValueError(
-            f"Device not mocked, and thus not available in this test: {device_name}"
-        )
-
-
-class TestStatisticsWriterVersion(TestStatisticsReaderWriter):
-    """TestStatistics class for print testing"""
-
-    @patch("builtins.print")  # Mock the print function
-    def test_version(self, mock_print):
-        """Print the stats-writer and lofar-station-client version"""
-        new_sys_argv = [sys.argv[0], "--version"]
-        with mock.patch.object(entry.sys, "argv", new_sys_argv):
-            entry.main()
-        mock_print.assert_called_with(
-            f"LOFAR Statistics Writer version: {writer_version} - "
-            f"LOFAR Station Client version: {lsc_version}"
-        )
-
-
-class TestStatisticsReaderWriterSST(TestStatisticsReaderWriter):
-    """TestStatistics class for SST-mode"""
-
-    def _run_writer_reader(
-        self, tmpdir: str, writer_argv: list, antennafield: str = "HBA"
-    ) -> Tuple[StatisticsData, StatisticsFileHeader]:
-        """Run the statistics writer with the given arguments,
-        and read and return the output."""
-        # default arguments for statistics writer
-        default_writer_sys_argv = [
-            sys.argv[0],
-            "--mode",
-            "SST",
-            "--file",
-            dirname(__file__) + "/SDP_SST_statistics_packets.bin",
-            "--output_dir",
-            tmpdir,
-        ]
-
-        if antennafield != "unknown":
-            default_writer_sys_argv += ["--antennafield", antennafield]
-
-        with mock.patch.object(
-            entry.sys, "argv", default_writer_sys_argv + writer_argv
-        ):
-            entry.main()
-
-        # check if file was written
-        self.assertTrue(isfile(f"{tmpdir}/SST_2021-09-20-12-17-40_{antennafield}.h5"))
-
-        # default arguments for statistics reader
-        default_reader_sys_argv = [
-            sys.argv[0],
-            "--files",
-            f"{tmpdir}/SST_2021-09-20-12-17-40_{antennafield}.h5",
-            "--start_time",
-            "2021-09-20#07:40:08.937",
-            "--end_time",
-            "2021-10-04#07:50:08.937",
-        ]
-
-        # test statistics reader
-        with mock.patch.object(reader_entry.sys, "argv", default_reader_sys_argv):
-            stat_parser = reader_entry.setup_stat_parser()
-            sst_statistics = stat_parser.list_statistics()
-            self.assertIsNotNone(sst_statistics)
-            stat = stat_parser.get_statistic(
-                "2021-09-20T12:17:40.000000"
-            )  # same as stat_parser.statistics[0]
-            file_header = stat_parser.file_header
-            self.assertIsNotNone(stat)
-
-        return stat, file_header
-
-    def test_header_info(self):
-        """Test whether the header info are inserted and collected in the proper way"""
-        with mock.patch.object(entry, "_get_tango_device", self._mock_get_tango_device):
-            with TemporaryDirectory() as tmpdir:
-                _, file_header = self._run_writer_reader(tmpdir, [])
-
-                self.assertIsNotNone(file_header.station_version)
-                self.assertIsNotNone(file_header.writer_version)
-                self.assertEqual("SST", file_header.mode)
-
-    def test_insert_tango_SST_statistics(self):
-        with TemporaryDirectory() as tmpdir:
-            writer_argv = []
-
-            with mock.patch.object(
-                entry, "_get_tango_device", self._mock_get_tango_device
-            ):
-                stat, file_header = self._run_writer_reader(tmpdir, writer_argv)
-
-            self.assertEqual(121, stat.data_id_signal_input_index)
-
-            # Test some AntennField attributes, whether they match our mock
-            self.assertEqual("HBA", file_header.antenna_type)
-            self.assertListEqual(
-                ["OK", "OK", "OK"], file_header.antenna_quality.tolist()
-            )
-
-    def test_no_tango_SST_statistics(self):
-        with TemporaryDirectory() as tmpdir:
-            writer_argv = [
-                "--no-tango",
-            ]
-
-            _ = self._run_writer_reader(tmpdir, writer_argv, "unknown")
-
-    def test_SST_statistics_with_device_in_off(self):
-        with TemporaryDirectory() as tmpdir:
-            writer_argv = []
-
-            with mock.patch.object(
-                entry, "_get_tango_device", self._mock_get_tango_device_off
-            ):
-                _ = self._run_writer_reader(tmpdir, writer_argv)
-
-    def test_station_name(self):
-        with TemporaryDirectory() as tmpdir:
-            writer_argv = []
-
-            with mock.patch.object(
-                entry, "_get_tango_device", self._mock_get_tango_device
-            ):
-                _, file_header = self._run_writer_reader(tmpdir, writer_argv)
-
-            self.assertEqual("DevStation", file_header.station_name)
-
-
-class TestStatisticsReaderWriterBST(TestStatisticsReaderWriter):
-    """TestStatistics class for BST-mode"""
-
-    def _run_writer_reader(
-        self, tmpdir: str, writer_argv: list
-    ) -> Tuple[StatisticsData, StatisticsFileHeader]:
-        """Run the statistics writer with the given arguments,
-        and read and return the output."""
-        # default arguments for statistics writer
-        default_writer_sys_argv = [
-            sys.argv[0],
-            "--mode",
-            "BST",
-            "--antennafield",
-            "HBA",
-            "--file",
-            dirname(__file__) + "/SDP_BST_statistics_packets.bin",
-            "--output_dir",
-            tmpdir,
-        ]
-
-        with mock.patch.object(
-            entry.sys, "argv", default_writer_sys_argv + writer_argv
-        ):
-            entry.main()
-
-        # check if file was written
-        self.assertTrue(isfile(f"{tmpdir}/BST_2022-05-20-11-08-44_HBA.h5"))
-
-        # default arguments for statistics reader
-        default_reader_sys_argv = [
-            sys.argv[0],
-            "--files",
-            f"{tmpdir}/BST_2022-05-20-11-08-44_HBA.h5",
-            "--start_time",
-            "2021-09-20#07:40:08.937",
-            "--end_time",
-            "2021-10-04#07:50:08.937",
-        ]
-
-        # test statistics reader
-        with mock.patch.object(reader_entry.sys, "argv", default_reader_sys_argv):
-            stat_parser = reader_entry.setup_stat_parser()
-            bst_statistics = stat_parser.list_statistics()
-            self.assertIsNotNone(bst_statistics)
-            file_header = stat_parser.file_header
-
-        return file_header
-
-    def test_insert_tango_BST_statistics(self):
-        with TemporaryDirectory() as tmpdir:
-            writer_argv = []
-
-            with mock.patch.object(
-                entry, "_get_tango_device", self._mock_get_tango_device
-            ):
-                _ = self._run_writer_reader(tmpdir, writer_argv)
-
-            # validate HDF5 content
-            with h5py.File(f"{tmpdir}/BST_2022-05-20-11-08-44_HBA.h5") as f:
-                # validate header
-                self.assertIn("station_version", dict(f.attrs))
-                self.assertIn("writer_version", dict(f.attrs))
-                self.assertIn("mode", dict(f.attrs))
-
-                # check for the datasets present in our input data
-                self.assertIn("BST_2022-05-20T11:08:45.000", dict(f.items()))
-
-    def test_bst(self):
-        with mock.patch.object(entry, "_get_tango_device", self._mock_get_tango_device):
-            with TemporaryDirectory() as tmpdir:
-                new_sys_argv = [
-                    sys.argv[0],
-                    "--mode",
-                    "BST",
-                    "--antennafield",
-                    "HBA",
-                    "--file",
-                    dirname(__file__) + "/SDP_BST_statistics_packets.bin",
-                    "--output_dir",
-                    tmpdir,
-                ]
-                with mock.patch.object(entry.sys, "argv", new_sys_argv):
-                    entry.main()
-
-                # check if file was written
-                self.assertTrue(isfile(f"{tmpdir}/BST_2022-05-20-11-08-44_HBA.h5"))
-
-
-class TestStatisticsWriterXST(TestStatisticsReaderWriter):
-    """TestStatistics class for XST-mode"""
-
-    def test_xst(self):
-        with mock.patch.object(entry, "_get_tango_device", self._mock_get_tango_device):
-            with TemporaryDirectory() as tmpdir:
-                new_sys_argv = [
-                    sys.argv[0],
-                    "--mode",
-                    "XST",
-                    "--antennafield",
-                    "HBA",
-                    "--file",
-                    dirname(__file__) + "/SDP_XST_statistics_packets.bin",
-                    "--output_dir",
-                    tmpdir,
-                ]
-                with mock.patch.object(entry.sys, "argv", new_sys_argv):
-                    entry.main()
-
-                # check if file was written
-                self.assertTrue(
-                    isfile(f"{tmpdir}/XST_2021-09-13-13-21-32_HBA_SB102.h5")
-                )
-
-                # validate HDF5 content
-                with h5py.File(f"{tmpdir}/XST_2021-09-13-13-21-32_HBA_SB102.h5") as f:
-                    # validate header
-                    self.assertIn("station_version", dict(f.attrs))
-                    self.assertIn("writer_version", dict(f.attrs))
-                    self.assertIn("mode", dict(f.attrs))
-
-                    # check for the datasets present in our input data
-                    self.assertIn("XST_2021-09-13T13:21:32.000", dict(f.items()))
-                    self.assertIn("XST_2021-09-13T13:21:33.000", dict(f.items()))
-                    self.assertIn("XST_2021-09-13T13:21:34.000", dict(f.items()))
-                    self.assertIn("XST_2021-09-13T13:21:35.000", dict(f.items()))
-                    self.assertIn("XST_2021-09-13T13:21:36.000", dict(f.items()))
-                    self.assertIn("XST_2021-09-13T13:21:37.000", dict(f.items()))
-                    self.assertIn("XST_2021-09-13T13:21:38.000", dict(f.items()))
-
-                    # check dataset dimensions, should match the actual
-                    # number of antennas, and cover both polarisations.
-                    nr_antennas = FakeAntennaFieldDeviceProxy.nr_antennas_R
-                    self.assertEqual(
-                        (nr_antennas * N_pol, nr_antennas * N_pol),
-                        f["XST_2021-09-13T13:21:32.000"].shape,
-                    )
-
-                    # check dataset header
-                    self.assertIn(
-                        "timestamp", dict(f["XST_2021-09-13T13:21:32.000"].attrs)
-                    )
-
-                    # check compression
-                    self.assertEqual(
-                        f["XST_2021-09-13T13:21:32.000"].compression, "gzip"
-                    )
-
-    def test_xst_multiple_subbands(self):
-        with mock.patch.object(entry, "_get_tango_device", self._mock_get_tango_device):
-            with TemporaryDirectory() as tmpdir:
-                new_sys_argv = [
-                    sys.argv[0],
-                    "--mode",
-                    "XST",
-                    "--antennafield",
-                    "HBA",
-                    "--file",
-                    dirname(__file__)
-                    + "/SDP_XST_statistics_packets_multiple_subbands.bin",
-                    "--output_dir",
-                    tmpdir,
-                ]
-                with mock.patch.object(entry.sys, "argv", new_sys_argv):
-                    entry.main()
-
-                # check if files were written
-                self.assertTrue(
-                    isfile(f"{tmpdir}/XST_2021-09-13-13-21-32_HBA_SB102.h5")
-                )
-                self.assertTrue(
-                    isfile(f"{tmpdir}/XST_2021-09-13-13-21-32_HBA_SB103.h5")
-                )
-
-    def test_xst_with_antennafield(self):
-        with TemporaryDirectory() as tmpdir:
-            with mock.patch.object(
-                entry, "_get_tango_device", self._mock_get_tango_device
-            ):
-                new_sys_argv = [
-                    sys.argv[0],
-                    "--mode",
-                    "XST",
-                    "--antennafield",
-                    "HBA",
-                    "--file",
-                    dirname(__file__) + "/SDP_XST_statistics_packets.bin",
-                    "--output_dir",
-                    tmpdir,
-                ]
-                with mock.patch.object(entry.sys, "argv", new_sys_argv):
-                    entry.main()
-
-                # check if file was written
-                self.assertTrue(
-                    isfile(f"{tmpdir}/XST_2021-09-13-13-21-32_HBA_SB102.h5")
-                )
-
-                # validate HDF5 content
-                with h5py.File(f"{tmpdir}/XST_2021-09-13-13-21-32_HBA_SB102.h5") as f:
-                    # check extra header fields provided by the AntennaField
-                    self.assertIn("antenna_names", dict(f.attrs))
-                    self.assertIn("antenna_quality", dict(f.attrs))
-                    self.assertIn("antenna_type", dict(f.attrs))
-                    self.assertIn("antennafield_device", dict(f.attrs))
-                    self.assertIn("rcu_pcb_id", dict(f.attrs))
-                    self.assertIn("rcu_pcb_version", dict(f.attrs))
-
-                    # check dataset dimensions, should match the number of antennas,
-                    # and cover both polarisations.
-                    self.assertEqual((6, 6), f["XST_2021-09-13T13:21:32.000"].shape)
-
-
-class TestDictToHdf5Attrs(base.TestCase):
-    def test_empty_dict(self):
-        self.assertEqual({}, hdf5._dict_to_hdf5_attrs({}))
-
-    def test_int(self):
-        self.assertDictEqual({"a": 1}, hdf5._dict_to_hdf5_attrs({"a": 1}))
-
-    def test_str(self):
-        self.assertDictEqual({"a": "b"}, hdf5._dict_to_hdf5_attrs({"a": "b"}))
-
-    def test_none(self):
-        self.assertTrue(
-            h5py.Empty, hdf5._dict_to_hdf5_attrs({"a": None})["a"].__class__
-        )
-
-    def test_nested_dict(self):
-        self.assertDictEqual({"a_b": 1}, hdf5._dict_to_hdf5_attrs({"a": {"b": 1}}))
-
-
-class TestSelect1D(base.TestCase):
-    def setUp(self):
-        self.in_data = numpy.array(list(range(128)))
-
-    def test_select1d_empty_list(self):
-        self.assertListEqual([], hdf5._select1d(self.in_data, []).tolist())
-
-    def test_select1d_select(self):
-        self.assertListEqual([2, 4], hdf5._select1d(self.in_data, [2, 4]).tolist())
-
-    def test_select1d_select_with_none(self):
-        self.assertListEqual(
-            [2, -1], hdf5._select1d(self.in_data, [2, None], none_value=-1).tolist()
-        )
-
-
-class TestSelect2D(base.TestCase):
-    def setUp(self):
-        # abuse complex numbers to create a 2D matrix with unique values
-        self.in_data = numpy.array([[x + y * 1j for x in range(16)] for y in range(8)])
-
-    def test_select2d_empty_list(self):
-        self.assertListEqual([[]], hdf5._select2d(self.in_data, []).tolist())
-
-    def test_select2d_select(self):
-        self.assertListEqual(
-            [[2 + 2j, 4 + 2j], [2 + 4j, 4 + 4j]],
-            hdf5._select2d(self.in_data, [2, 4]).tolist(),
-        )
-
-    def test_select2d_select_with_none(self):
-        self.assertListEqual(
-            [[2 + 2j, -1], [-1, -1]],
-            hdf5._select2d(self.in_data, [2, None], none_value=-1).tolist(),
-        )
-- 
GitLab