diff --git a/README.md b/README.md index 0aba8c99bf57f82767a660d269f202a6bb504766..8642e393f57e536d87f4bf7ed3ad5d37380ef20e 100644 --- a/README.md +++ b/README.md @@ -105,6 +105,7 @@ tox -e debug tests.requests.test_prometheus ``` ## Releasenotes +- 0.14.2 - Added `station_name` attribute to Hdf5 file header - 0.14.1 - Added `beamlet.subband_select_RW` attribute to BSTHdf5Writer - 0.14 - Added new attributes to statistics HDF file as well as documentation - 0.13 - Added lazy connection behavior to `devices.LofarDeviceProxy` class diff --git a/VERSION b/VERSION index 930e3000bdc9aaa03a5a26831c271dd32d494f61..e867cc2a66a8b9ca98fa54db45cabbcc78747603 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.14.1 +0.14.2 diff --git a/lofar_station_client/statistics/statistics_data.py b/lofar_station_client/statistics/statistics_data.py index 177f66af0a31d8e401b616424e3507f612327cdd..b7a99fa08a6a95f3503d4df33b381f05b8eb78c0 100644 --- a/lofar_station_client/statistics/statistics_data.py +++ b/lofar_station_client/statistics/statistics_data.py @@ -8,7 +8,7 @@ Models the structure of an HDF statistics file. """ import inspect -from typing import Dict +from typing import Dict, List from numpy import ndarray @@ -186,7 +186,7 @@ class StatisticsFileHeader: antenna_reference_itrf: str = attribute(optional=True) """ Absolute reference position of each tile, in ITRF (XYZ) """ - rcu_attenuator_db: [float] = attribute(name="rcu_attenuator_dB", optional=True) + rcu_attenuator_db: List[float] = attribute(name="rcu_attenuator_dB", optional=True) """ Amount of dB with which each antenna signal must be adjusted to line up. """ rcu_band_select: float = attribute(optional=True) @@ -201,7 +201,7 @@ class StatisticsFileHeader: fpga_firmware_version: str = attribute(optional=True) fpga_hardware_version: str = attribute(optional=True) - rcu_pcb_id: str = attribute(optional=True) + rcu_pcb_id: int = attribute(optional=True) rcu_pcb_version: str = attribute(optional=True) def __eq__(self, other): diff --git a/lofar_station_client/statistics/writer/entry.py b/lofar_station_client/statistics/writer/entry.py index b064039733e9776245a54fe4ada95721bad970e3..5d21186c397e693f7d0dfe564f2689b2e420dab3 100644 --- a/lofar_station_client/statistics/writer/entry.py +++ b/lofar_station_client/statistics/writer/entry.py @@ -4,13 +4,14 @@ """Statistics writer parser and executor""" # too-many-locals, broad-except, raise-missing-from, -# too-many-branches, too-many-arguments -# pylint: disable=R0914, W0703, W0707, R0912, R0913 +# too-many-branches +# pylint: disable=R0914, W0703, W0707, R0912 import argparse import logging import sys import time +from typing import Dict from tango import DeviceProxy @@ -132,6 +133,14 @@ def _create_parser(): default="", help="Digitalbeam device to collect data for", ) + parser.add_argument( + "-SM", + "--stationmanager", + type=str, + choices=["", "1"], + default="1", + help="StationManager device to collect data for", + ) return parser @@ -150,10 +159,7 @@ def _create_writer( interval, output_dir, decimation, - antennafield_device: DeviceProxy = None, - sdp_device: DeviceProxy = None, - tilebeam_device: DeviceProxy = None, - digitalbeam_device: DeviceProxy = None, + devices: Dict[str, DeviceProxy], ): """Create the writer""" if mode == "XST": @@ -161,25 +167,21 @@ def _create_writer( new_file_time_interval=interval, file_location=output_dir, decimation_factor=decimation, - antennafield_device=antennafield_device, - sdp_device=sdp_device, - tilebeam_device=tilebeam_device, + devices=devices, ) if mode == "SST": return SstHdf5Writer( new_file_time_interval=interval, file_location=output_dir, decimation_factor=decimation, - antennafield_device=antennafield_device, - sdp_device=sdp_device, - tilebeam_device=tilebeam_device, + devices=devices, ) if mode == "BST": return BstHdf5Writer( new_file_time_interval=interval, file_location=output_dir, decimation_factor=decimation, - digitalbeam_device=digitalbeam_device, + devices=devices, ) logger.fatal("Invalid mode: %s", mode) sys.exit(1) @@ -271,37 +273,48 @@ def main(): logger.setLevel(logging.DEBUG) logger.debug("Setting loglevel to DEBUG") + # Create tango devices dictionary + devices = { + "antennafield": None, + "sdp": None, + "tilebeam": None, + "digitalbeam": None, + "stationmanager": None, + } + if args.antennafield: antennafield_device = _get_tango_device( tango_disabled, host, f"STAT/AntennaField/{args.antennafield}" ) + devices["antennafield"] = antennafield_device if antennafield_device and filename: logger.warning( "Combining live metadata from AntennaField \ device with statistics read from disk." ) - else: - antennafield_device = None if args.sdp: sdp_device = _get_tango_device(tango_disabled, host, f"STAT/SDP/{args.sdp}") - else: - sdp_device = None + devices["sdp"] = sdp_device if args.tilebeam: tilebeam_device = _get_tango_device( tango_disabled, host, f"STAT/TileBeam/{args.tilebeam}" ) - else: - tilebeam_device = None + devices["tilebeam"] = tilebeam_device if args.digitalbeam: digitalbeam_device = _get_tango_device( tango_disabled, host, f"STAT/DigitalBeam/{args.digitalbeam}" ) - else: - digitalbeam_device = None + devices["digitalbeam"] = digitalbeam_device + + if args.stationmanager: + stationmanager_device = _get_tango_device( + tango_disabled, host, f"STAT/StationManager/{args.stationmanager}" + ) + devices["stationmanager"] = stationmanager_device # creates the TCP receiver that is given to the writer receiver = _create_receiver(filename, host, port) @@ -312,10 +325,7 @@ def main(): interval, output_dir, decimation, - antennafield_device, - sdp_device, - tilebeam_device, - digitalbeam_device, + devices, ) # start looping diff --git a/lofar_station_client/statistics/writer/hdf5.py b/lofar_station_client/statistics/writer/hdf5.py index 21a5ec5650a0e072bd19a3aa7c2ee198eddbccab..c5796abfb0316ed339d75d9ae2de2994d3324399 100644 --- a/lofar_station_client/statistics/writer/hdf5.py +++ b/lofar_station_client/statistics/writer/hdf5.py @@ -11,7 +11,7 @@ import logging from abc import ABC, abstractmethod from datetime import datetime, timedelta from itertools import chain -from typing import TypeVar, List +from typing import TypeVar, List, Dict # python hdf5 import h5py @@ -49,9 +49,15 @@ __all__ = [ def _get_station_version(device: DeviceProxy) -> str: """Retrieve the Lofar Station Control version""" try: - if device is not None: - return device.version_R + return device.version_R + except Exception: return "" + + +def _get_station_name(device: DeviceProxy) -> str: + """Retrieve the Station name from the StationManager device""" + try: + return device.station_name_R except Exception: return "" @@ -126,10 +132,7 @@ class HDF5Writer(ABC): file_location, statistics_mode, decimation_factor, - antennafield_device: DeviceProxy = None, - sdp_device: DeviceProxy = None, - tilebeam_device: DeviceProxy = None, - digitalbeam_device: DeviceProxy = None, + devices: Dict[str, DeviceProxy], ): # all variables that deal with the matrix that's currently being decoded self.file: StatisticsDataFile = None @@ -156,10 +159,11 @@ class HDF5Writer(ABC): self.mode = statistics_mode.upper() # Set devices if any, defaults to None - self.antennafield_device = antennafield_device - self.sdp_device = sdp_device - self.tilebeam_device = tilebeam_device - self.digitalbeam_device = digitalbeam_device + self.antennafield_device = devices["antennafield"] + self.sdp_device = devices["sdp"] + self.tilebeam_device = devices["tilebeam"] + self.digitalbeam_device = devices["digitalbeam"] + self.stationmanager_device = devices["stationmanager"] # By default, select all the values from SDP self.antenna_selection: List[int] = None @@ -171,7 +175,9 @@ class HDF5Writer(ABC): self.antennafield_device.Antenna_to_SDP_Mapping_R ) except DevFailed: - logger.exception("Failed to read from %s", antennafield_device.name()) + logger.exception( + "Failed to read from %s", devices["antennafield"].name() + ) else: # select the values from SDP that represent the antennas in this field self.antenna_selection = list( @@ -189,7 +195,7 @@ class HDF5Writer(ABC): def set_file_header(self): """Returns the header fields per HDF5 file.""" - # self.file.station_name + self.file.station_name = _get_station_name(self.stationmanager_device) self.file.station_version = _get_station_version(self.antennafield_device) self.file.writer_version = _get_writer_version() self.file.mode = self.mode @@ -205,6 +211,8 @@ class HDF5Writer(ABC): self.file.rcu_band_select = self.antennafield_device.RCU_band_select_R self.file.rcu_dth_on = self.antennafield_device.RCU_DTH_on_R self.file.rcu_dth_freq = self.antennafield_device.RCU_DTH_freq_R + self.file.rcu_pcb_id = self.antennafield_device.RCU_PCB_ID_R + self.file.rcu_pcb_version = self.antennafield_device.RCU_PCB_version_R self.file.antenna_quality = ( self.antennafield_device.Antenna_Quality_str_R ) # noqa @@ -514,19 +522,14 @@ class SstHdf5Writer(HDF5Writer): new_file_time_interval, file_location, decimation_factor, - antennafield_device: DeviceProxy = None, - sdp_device: DeviceProxy = None, - tilebeam_device: DeviceProxy = None, + devices: Dict[str, DeviceProxy], ): super().__init__( new_file_time_interval, file_location, HDF5Writer.SST_MODE, decimation_factor, - antennafield_device=antennafield_device, - sdp_device=sdp_device, - tilebeam_device=tilebeam_device, - digitalbeam_device=None, + devices=devices, ) def decoder(self, packet): @@ -561,17 +564,14 @@ class BstHdf5Writer(HDF5Writer): new_file_time_interval, file_location, decimation_factor, - digitalbeam_device: DeviceProxy = None, + devices: Dict[str, DeviceProxy], ): super().__init__( new_file_time_interval, file_location, HDF5Writer.BST_MODE, decimation_factor, - antennafield_device=None, - sdp_device=None, - tilebeam_device=None, - digitalbeam_device=digitalbeam_device, + devices=devices, ) def decoder(self, packet): @@ -596,9 +596,7 @@ class XstHdf5Writer(HDF5Writer): new_file_time_interval, file_location, decimation_factor, - antennafield_device, - sdp_device, - tilebeam_device, + devices: Dict[str, DeviceProxy], subband_index, ): super().__init__( @@ -606,10 +604,7 @@ class XstHdf5Writer(HDF5Writer): file_location, HDF5Writer.XST_MODE, decimation_factor, - antennafield_device, - tilebeam_device, - sdp_device, - digitalbeam_device=None, + devices=devices, ) self.subband_index = subband_index @@ -659,9 +654,7 @@ class ParallelXstHdf5Writer: new_file_time_interval, file_location, decimation_factor, - antennafield_device, - sdp_device, - tilebeam_device, + devices: Dict[str, DeviceProxy], ): # maintain a dedicated HDF5Writer per subband self.writers = {} @@ -673,9 +666,7 @@ class ParallelXstHdf5Writer: new_file_time_interval, file_location, decimation_factor, - antennafield_device, - sdp_device, - tilebeam_device, + devices, subband, ) diff --git a/tests/observation/test_multi_station_observation.py b/tests/observation/test_multi_station_observation.py index b01cbf14d4d13830b7424030695307caab35a4d7..619918632ebe16676f6eaf77bad609d4b30d0653 100644 --- a/tests/observation/test_multi_station_observation.py +++ b/tests/observation/test_multi_station_observation.py @@ -6,23 +6,21 @@ # See LICENSE.txt for more info. from unittest import mock - - -from tests import base +import concurrent.futures from json import loads +from tests import base from lofar_station_client.observation.multi_station_observation import ( MultiStationObservation, ) -import concurrent.futures - SPEC_DICT = loads( """ { "observation_id": 12345, "stop_time": "2106-02-07T00:00:00", + "antenna_set": "ALL", "antenna_mask": [0,1,2,9], "filter": "HBA_110_190", "SAPs": [{ diff --git a/tests/statistics/test_writer.py b/tests/statistics/test_writer.py index 6102b485805211b60a685636f21dc02a1834e4d5..aaa98007da4e29d4c8381731546c2d05969512c9 100644 --- a/tests/statistics/test_writer.py +++ b/tests/statistics/test_writer.py @@ -4,6 +4,7 @@ from os.path import dirname, isfile from tempfile import TemporaryDirectory from unittest import mock +from typing import Tuple import sys import h5py import numpy @@ -18,6 +19,7 @@ from tests.test_devices import ( FakeAntennaFieldDeviceProxy, FakeOffAntennaFieldDeviceProxy, FakeDigitalBeamDeviceProxy, + FakeStationManagerDeviceProxy, ) from tests import base @@ -31,6 +33,8 @@ class TestStatisticsReaderWriter(base.TestCase): return FakeAntennaFieldDeviceProxy(device_name) if device_name == "STAT/DigitalBeam/HBA": return FakeDigitalBeamDeviceProxy(device_name) + if device_name == "STAT/StationManager/1": + return FakeStationManagerDeviceProxy raise ValueError( f"Device not mocked, and thus not available in this test: {device_name}" ) @@ -39,16 +43,19 @@ class TestStatisticsReaderWriter(base.TestCase): """Return our mocked DeviceProxies that simulate a device that is off""" if device_name == "STAT/AntennaField/LBA": return FakeOffAntennaFieldDeviceProxy(device_name) - + if device_name == "STAT/StationManager/1": + return FakeStationManagerDeviceProxy raise ValueError( f"Device not mocked, and thus not available in this test: {device_name}" ) class TestStatisticsReaderWriterSST(TestStatisticsReaderWriter): + """TestStatistics class for SST-mode""" + def _run_writer_reader( self, tmpdir: str, writer_argv: list - ) -> (StatisticsData, StatisticsFileHeader): + ) -> Tuple[StatisticsData, StatisticsFileHeader]: """Run the statistics writer with the given arguments, and read and return the output.""" # default arguments for statistics writer @@ -97,12 +104,13 @@ class TestStatisticsReaderWriterSST(TestStatisticsReaderWriter): def test_header_info(self): """Test whether the header info are inserted and collected in the proper way""" - with TemporaryDirectory() as tmpdir: - _, file_header = self._run_writer_reader(tmpdir, []) + with mock.patch.object(entry, "_get_tango_device", self._mock_get_tango_device): + with TemporaryDirectory() as tmpdir: + _, file_header = self._run_writer_reader(tmpdir, []) - self.assertIsNotNone(file_header.station_version) - self.assertIsNotNone(file_header.writer_version) - self.assertEqual("SST", file_header.mode) + self.assertIsNotNone(file_header.station_version) + self.assertIsNotNone(file_header.writer_version) + self.assertEqual("SST", file_header.mode) def test_insert_tango_SST_statistics(self): with TemporaryDirectory() as tmpdir: @@ -144,11 +152,27 @@ class TestStatisticsReaderWriterSST(TestStatisticsReaderWriter): ): _ = self._run_writer_reader(tmpdir, writer_argv) + def test_station_name(self): + with TemporaryDirectory() as tmpdir: + writer_argv = [ + "--stationmanager", + "1", + ] + + with mock.patch.object( + entry, "_get_tango_device", self._mock_get_tango_device + ): + _, file_header = self._run_writer_reader(tmpdir, writer_argv) + + self.assertEqual("DevStation", file_header.station_name) + class TestStatisticsReaderWriterBST(TestStatisticsReaderWriter): + """TestStatistics class for BST-mode""" + def _run_writer_reader( self, tmpdir: str, writer_argv: list - ) -> (StatisticsData, StatisticsFileHeader): + ) -> Tuple[StatisticsData, StatisticsFileHeader]: """Run the statistics writer with the given arguments, and read and return the output.""" # default arguments for statistics writer @@ -206,71 +230,95 @@ class TestStatisticsReaderWriterBST(TestStatisticsReaderWriter): # Test some AntennField attributes, whether they match our mock self.assertListEqual(list(range(0, 488)), file_header.subbands.tolist()) + def test_bst(self): + with mock.patch.object(entry, "_get_tango_device", self._mock_get_tango_device): + with TemporaryDirectory() as tmpdir: + new_sys_argv = [ + sys.argv[0], + "--mode", + "BST", + "--file", + dirname(__file__) + "/SDP_BST_statistics_packets.bin", + "--output_dir", + tmpdir, + ] + with mock.patch.object(entry.sys, "argv", new_sys_argv): + with self.assertRaises(SystemExit): + entry.main() + + # check if file was written + self.assertTrue(isfile(f"{tmpdir}/BST_2022-05-20-11-08-44.h5")) + + +class TestStatisticsWriterXST(TestStatisticsReaderWriter): + """TestStatistics class for XST-mode""" -class TestStatisticsWriter(base.TestCase): def test_xst(self): - with TemporaryDirectory() as tmpdir: - new_sys_argv = [ - sys.argv[0], - "--mode", - "XST", - "--file", - dirname(__file__) + "/SDP_XST_statistics_packets.bin", - "--output_dir", - tmpdir, - ] - with mock.patch.object(entry.sys, "argv", new_sys_argv): - with self.assertRaises(SystemExit): - entry.main() - - # check if file was written - self.assertTrue(isfile(f"{tmpdir}/XST_SB102_2021-09-13-13-21-32.h5")) - - # validate HDF5 content - with h5py.File(f"{tmpdir}/XST_SB102_2021-09-13-13-21-32.h5") as f: - # validate header - self.assertIn("station_version", dict(f.attrs)) - self.assertIn("writer_version", dict(f.attrs)) - self.assertIn("mode", dict(f.attrs)) - - # check for the datasets present in our input data - self.assertIn("XST_2021-09-13T13:21:32.000+00:00", dict(f.items())) - self.assertIn("XST_2021-09-13T13:21:32.999+00:00", dict(f.items())) - self.assertIn("XST_2021-09-13T13:21:34.000+00:00", dict(f.items())) - self.assertIn("XST_2021-09-13T13:21:34.999+00:00", dict(f.items())) - self.assertIn("XST_2021-09-13T13:21:36.000+00:00", dict(f.items())) - self.assertIn("XST_2021-09-13T13:21:36.999+00:00", dict(f.items())) - self.assertIn("XST_2021-09-13T13:21:38.000+00:00", dict(f.items())) - - # check dataset dimensions, should match the maximum number of antennas, - # and cover both polarisations. - self.assertEqual( - (192, 192), f["XST_2021-09-13T13:21:32.000+00:00"].shape - ) - - # check dataset header - self.assertIn( - "timestamp", dict(f["XST_2021-09-13T13:21:32.000+00:00"].attrs) - ) + with mock.patch.object(entry, "_get_tango_device", self._mock_get_tango_device): + with TemporaryDirectory() as tmpdir: + new_sys_argv = [ + sys.argv[0], + "--mode", + "XST", + "--file", + dirname(__file__) + "/SDP_XST_statistics_packets.bin", + "--output_dir", + tmpdir, + ] + with mock.patch.object(entry.sys, "argv", new_sys_argv): + with self.assertRaises(SystemExit): + entry.main() + + # check if file was written + self.assertTrue(isfile(f"{tmpdir}/XST_SB102_2021-09-13-13-21-32.h5")) + + # validate HDF5 content + with h5py.File(f"{tmpdir}/XST_SB102_2021-09-13-13-21-32.h5") as f: + # validate header + self.assertIn("station_version", dict(f.attrs)) + self.assertIn("writer_version", dict(f.attrs)) + self.assertIn("mode", dict(f.attrs)) + + # check for the datasets present in our input data + self.assertIn("XST_2021-09-13T13:21:32.000+00:00", dict(f.items())) + self.assertIn("XST_2021-09-13T13:21:32.999+00:00", dict(f.items())) + self.assertIn("XST_2021-09-13T13:21:34.000+00:00", dict(f.items())) + self.assertIn("XST_2021-09-13T13:21:34.999+00:00", dict(f.items())) + self.assertIn("XST_2021-09-13T13:21:36.000+00:00", dict(f.items())) + self.assertIn("XST_2021-09-13T13:21:36.999+00:00", dict(f.items())) + self.assertIn("XST_2021-09-13T13:21:38.000+00:00", dict(f.items())) + + # check dataset dimensions, should match the maximum + # number of antennas, and cover both polarisations. + self.assertEqual( + (192, 192), f["XST_2021-09-13T13:21:32.000+00:00"].shape + ) + + # check dataset header + self.assertIn( + "timestamp", dict(f["XST_2021-09-13T13:21:32.000+00:00"].attrs) + ) def test_xst_multiple_subbands(self): - with TemporaryDirectory() as tmpdir: - new_sys_argv = [ - sys.argv[0], - "--mode", - "XST", - "--file", - dirname(__file__) + "/SDP_XST_statistics_packets_multiple_subbands.bin", - "--output_dir", - tmpdir, - ] - with mock.patch.object(entry.sys, "argv", new_sys_argv): - with self.assertRaises(SystemExit): - entry.main() + with mock.patch.object(entry, "_get_tango_device", self._mock_get_tango_device): + with TemporaryDirectory() as tmpdir: + new_sys_argv = [ + sys.argv[0], + "--mode", + "XST", + "--file", + dirname(__file__) + + "/SDP_XST_statistics_packets_multiple_subbands.bin", + "--output_dir", + tmpdir, + ] + with mock.patch.object(entry.sys, "argv", new_sys_argv): + with self.assertRaises(SystemExit): + entry.main() - # check if files were written - self.assertTrue(isfile(f"{tmpdir}/XST_SB102_2021-09-13-13-21-32.h5")) - self.assertTrue(isfile(f"{tmpdir}/XST_SB103_2021-09-13-13-21-32.h5")) + # check if files were written + self.assertTrue(isfile(f"{tmpdir}/XST_SB102_2021-09-13-13-21-32.h5")) + self.assertTrue(isfile(f"{tmpdir}/XST_SB103_2021-09-13-13-21-32.h5")) def test_xst_with_antennafield(self): with TemporaryDirectory() as tmpdir: @@ -305,6 +353,8 @@ class TestStatisticsWriter(base.TestCase): self.assertIn("rcu_band_select", dict(f.attrs)) self.assertIn("rcu_dth_on", dict(f.attrs)) self.assertIn("rcu_dth_freq", dict(f.attrs)) + self.assertIn("rcu_pcb_id", dict(f.attrs)) + self.assertIn("rcu_pcb_version", dict(f.attrs)) # check dataset dimensions, should match the number of antennas, # and cover both polarisations. @@ -312,24 +362,6 @@ class TestStatisticsWriter(base.TestCase): (6, 6), f["XST_2021-09-13T13:21:32.000+00:00"].shape ) - def test_bst(self): - with TemporaryDirectory() as tmpdir: - new_sys_argv = [ - sys.argv[0], - "--mode", - "BST", - "--file", - dirname(__file__) + "/SDP_BST_statistics_packets.bin", - "--output_dir", - tmpdir, - ] - with mock.patch.object(entry.sys, "argv", new_sys_argv): - with self.assertRaises(SystemExit): - entry.main() - - # check if file was written - self.assertTrue(isfile(f"{tmpdir}/BST_2022-05-20-11-08-44.h5")) - class TestDictToHdf5Attrs(base.TestCase): def test_empty_dict(self): diff --git a/tests/test_devices.py b/tests/test_devices.py index 741cca56377a27dcc32287fd6cc976c4bf8f94c5..780a85cc93355e0fd5158543b6db835a68773586 100644 --- a/tests/test_devices.py +++ b/tests/test_devices.py @@ -285,6 +285,8 @@ class FakeAntennaFieldDeviceProxy: RCU_band_select_R = [1] * 3 RCU_DTH_on_R = [False] * 3 RCU_DTH_freq_R = [0.0] * 3 + RCU_PCB_ID_R = [[1, 1]] * 48 + RCU_PCB_version_R = [["version", "version"]] * 48 HBAT_PWR_on_R = [] Frequency_Band_R = [] @@ -323,3 +325,18 @@ class FakeDigitalBeamDeviceProxy: def __getattr__(self, attrname): return getattr(self, attrname) + + +class FakeStationManagerDeviceProxy: + """DeviceProxy that mocks access to a StationManager device.""" + + station_name_R = "DevStation" + + def __init__(self, name): + self._name = name + + def name(self): + return self._name + + def __getattr__(self, attrname): + return getattr(self, attrname)