Skip to content
Snippets Groups Projects
Commit 3a85d92c authored by Stefano Di Frischia's avatar Stefano Di Frischia
Browse files

Merge branch 'main' into L2SS-1249-fix-version-bug

parents 250fc6cd 54458772
No related branches found
No related tags found
1 merge request!41L2SS-1249: fix version bug
......@@ -105,6 +105,7 @@ tox -e debug tests.requests.test_prometheus
```
## Releasenotes
- 0.14.2 - Added `station_name` attribute to Hdf5 file header
- 0.14.1 - Added `beamlet.subband_select_RW` attribute to BSTHdf5Writer
- 0.14 - Added new attributes to statistics HDF file as well as documentation
- 0.13 - Added lazy connection behavior to `devices.LofarDeviceProxy` class
......
0.14.1
0.14.2
......@@ -8,7 +8,7 @@
Models the structure of an HDF statistics file.
"""
import inspect
from typing import Dict
from typing import Dict, List
from numpy import ndarray
......@@ -186,7 +186,7 @@ class StatisticsFileHeader:
antenna_reference_itrf: str = attribute(optional=True)
""" Absolute reference position of each tile, in ITRF (XYZ) """
rcu_attenuator_db: [float] = attribute(name="rcu_attenuator_dB", optional=True)
rcu_attenuator_db: List[float] = attribute(name="rcu_attenuator_dB", optional=True)
""" Amount of dB with which each antenna signal must be adjusted to line up. """
rcu_band_select: float = attribute(optional=True)
......@@ -201,7 +201,7 @@ class StatisticsFileHeader:
fpga_firmware_version: str = attribute(optional=True)
fpga_hardware_version: str = attribute(optional=True)
rcu_pcb_id: str = attribute(optional=True)
rcu_pcb_id: int = attribute(optional=True)
rcu_pcb_version: str = attribute(optional=True)
def __eq__(self, other):
......
......@@ -4,13 +4,14 @@
"""Statistics writer parser and executor"""
# too-many-locals, broad-except, raise-missing-from,
# too-many-branches, too-many-arguments
# pylint: disable=R0914, W0703, W0707, R0912, R0913
# too-many-branches
# pylint: disable=R0914, W0703, W0707, R0912
import argparse
import logging
import sys
import time
from typing import Dict
from tango import DeviceProxy
......@@ -132,6 +133,14 @@ def _create_parser():
default="",
help="Digitalbeam device to collect data for",
)
parser.add_argument(
"-SM",
"--stationmanager",
type=str,
choices=["", "1"],
default="1",
help="StationManager device to collect data for",
)
return parser
......@@ -150,10 +159,7 @@ def _create_writer(
interval,
output_dir,
decimation,
antennafield_device: DeviceProxy = None,
sdp_device: DeviceProxy = None,
tilebeam_device: DeviceProxy = None,
digitalbeam_device: DeviceProxy = None,
devices: Dict[str, DeviceProxy],
):
"""Create the writer"""
if mode == "XST":
......@@ -161,25 +167,21 @@ def _create_writer(
new_file_time_interval=interval,
file_location=output_dir,
decimation_factor=decimation,
antennafield_device=antennafield_device,
sdp_device=sdp_device,
tilebeam_device=tilebeam_device,
devices=devices,
)
if mode == "SST":
return SstHdf5Writer(
new_file_time_interval=interval,
file_location=output_dir,
decimation_factor=decimation,
antennafield_device=antennafield_device,
sdp_device=sdp_device,
tilebeam_device=tilebeam_device,
devices=devices,
)
if mode == "BST":
return BstHdf5Writer(
new_file_time_interval=interval,
file_location=output_dir,
decimation_factor=decimation,
digitalbeam_device=digitalbeam_device,
devices=devices,
)
logger.fatal("Invalid mode: %s", mode)
sys.exit(1)
......@@ -271,37 +273,48 @@ def main():
logger.setLevel(logging.DEBUG)
logger.debug("Setting loglevel to DEBUG")
# Create tango devices dictionary
devices = {
"antennafield": None,
"sdp": None,
"tilebeam": None,
"digitalbeam": None,
"stationmanager": None,
}
if args.antennafield:
antennafield_device = _get_tango_device(
tango_disabled, host, f"STAT/AntennaField/{args.antennafield}"
)
devices["antennafield"] = antennafield_device
if antennafield_device and filename:
logger.warning(
"Combining live metadata from AntennaField \
device with statistics read from disk."
)
else:
antennafield_device = None
if args.sdp:
sdp_device = _get_tango_device(tango_disabled, host, f"STAT/SDP/{args.sdp}")
else:
sdp_device = None
devices["sdp"] = sdp_device
if args.tilebeam:
tilebeam_device = _get_tango_device(
tango_disabled, host, f"STAT/TileBeam/{args.tilebeam}"
)
else:
tilebeam_device = None
devices["tilebeam"] = tilebeam_device
if args.digitalbeam:
digitalbeam_device = _get_tango_device(
tango_disabled, host, f"STAT/DigitalBeam/{args.digitalbeam}"
)
else:
digitalbeam_device = None
devices["digitalbeam"] = digitalbeam_device
if args.stationmanager:
stationmanager_device = _get_tango_device(
tango_disabled, host, f"STAT/StationManager/{args.stationmanager}"
)
devices["stationmanager"] = stationmanager_device
# creates the TCP receiver that is given to the writer
receiver = _create_receiver(filename, host, port)
......@@ -312,10 +325,7 @@ def main():
interval,
output_dir,
decimation,
antennafield_device,
sdp_device,
tilebeam_device,
digitalbeam_device,
devices,
)
# start looping
......
......@@ -11,7 +11,7 @@ import logging
from abc import ABC, abstractmethod
from datetime import datetime, timedelta
from itertools import chain
from typing import TypeVar, List
from typing import TypeVar, List, Dict
# python hdf5
import h5py
......@@ -49,9 +49,15 @@ __all__ = [
def _get_station_version(device: DeviceProxy) -> str:
"""Retrieve the Lofar Station Control version"""
try:
if device is not None:
return device.version_R
except Exception:
return ""
def _get_station_name(device: DeviceProxy) -> str:
"""Retrieve the Station name from the StationManager device"""
try:
return device.station_name_R
except Exception:
return ""
......@@ -126,10 +132,7 @@ class HDF5Writer(ABC):
file_location,
statistics_mode,
decimation_factor,
antennafield_device: DeviceProxy = None,
sdp_device: DeviceProxy = None,
tilebeam_device: DeviceProxy = None,
digitalbeam_device: DeviceProxy = None,
devices: Dict[str, DeviceProxy],
):
# all variables that deal with the matrix that's currently being decoded
self.file: StatisticsDataFile = None
......@@ -156,10 +159,11 @@ class HDF5Writer(ABC):
self.mode = statistics_mode.upper()
# Set devices if any, defaults to None
self.antennafield_device = antennafield_device
self.sdp_device = sdp_device
self.tilebeam_device = tilebeam_device
self.digitalbeam_device = digitalbeam_device
self.antennafield_device = devices["antennafield"]
self.sdp_device = devices["sdp"]
self.tilebeam_device = devices["tilebeam"]
self.digitalbeam_device = devices["digitalbeam"]
self.stationmanager_device = devices["stationmanager"]
# By default, select all the values from SDP
self.antenna_selection: List[int] = None
......@@ -171,7 +175,9 @@ class HDF5Writer(ABC):
self.antennafield_device.Antenna_to_SDP_Mapping_R
)
except DevFailed:
logger.exception("Failed to read from %s", antennafield_device.name())
logger.exception(
"Failed to read from %s", devices["antennafield"].name()
)
else:
# select the values from SDP that represent the antennas in this field
self.antenna_selection = list(
......@@ -189,7 +195,7 @@ class HDF5Writer(ABC):
def set_file_header(self):
"""Returns the header fields per HDF5 file."""
# self.file.station_name
self.file.station_name = _get_station_name(self.stationmanager_device)
self.file.station_version = _get_station_version(self.antennafield_device)
self.file.writer_version = _get_writer_version()
self.file.mode = self.mode
......@@ -205,6 +211,8 @@ class HDF5Writer(ABC):
self.file.rcu_band_select = self.antennafield_device.RCU_band_select_R
self.file.rcu_dth_on = self.antennafield_device.RCU_DTH_on_R
self.file.rcu_dth_freq = self.antennafield_device.RCU_DTH_freq_R
self.file.rcu_pcb_id = self.antennafield_device.RCU_PCB_ID_R
self.file.rcu_pcb_version = self.antennafield_device.RCU_PCB_version_R
self.file.antenna_quality = (
self.antennafield_device.Antenna_Quality_str_R
) # noqa
......@@ -514,19 +522,14 @@ class SstHdf5Writer(HDF5Writer):
new_file_time_interval,
file_location,
decimation_factor,
antennafield_device: DeviceProxy = None,
sdp_device: DeviceProxy = None,
tilebeam_device: DeviceProxy = None,
devices: Dict[str, DeviceProxy],
):
super().__init__(
new_file_time_interval,
file_location,
HDF5Writer.SST_MODE,
decimation_factor,
antennafield_device=antennafield_device,
sdp_device=sdp_device,
tilebeam_device=tilebeam_device,
digitalbeam_device=None,
devices=devices,
)
def decoder(self, packet):
......@@ -561,17 +564,14 @@ class BstHdf5Writer(HDF5Writer):
new_file_time_interval,
file_location,
decimation_factor,
digitalbeam_device: DeviceProxy = None,
devices: Dict[str, DeviceProxy],
):
super().__init__(
new_file_time_interval,
file_location,
HDF5Writer.BST_MODE,
decimation_factor,
antennafield_device=None,
sdp_device=None,
tilebeam_device=None,
digitalbeam_device=digitalbeam_device,
devices=devices,
)
def decoder(self, packet):
......@@ -596,9 +596,7 @@ class XstHdf5Writer(HDF5Writer):
new_file_time_interval,
file_location,
decimation_factor,
antennafield_device,
sdp_device,
tilebeam_device,
devices: Dict[str, DeviceProxy],
subband_index,
):
super().__init__(
......@@ -606,10 +604,7 @@ class XstHdf5Writer(HDF5Writer):
file_location,
HDF5Writer.XST_MODE,
decimation_factor,
antennafield_device,
tilebeam_device,
sdp_device,
digitalbeam_device=None,
devices=devices,
)
self.subband_index = subband_index
......@@ -659,9 +654,7 @@ class ParallelXstHdf5Writer:
new_file_time_interval,
file_location,
decimation_factor,
antennafield_device,
sdp_device,
tilebeam_device,
devices: Dict[str, DeviceProxy],
):
# maintain a dedicated HDF5Writer per subband
self.writers = {}
......@@ -673,9 +666,7 @@ class ParallelXstHdf5Writer:
new_file_time_interval,
file_location,
decimation_factor,
antennafield_device,
sdp_device,
tilebeam_device,
devices,
subband,
)
......
......@@ -6,23 +6,21 @@
# See LICENSE.txt for more info.
from unittest import mock
from tests import base
import concurrent.futures
from json import loads
from tests import base
from lofar_station_client.observation.multi_station_observation import (
MultiStationObservation,
)
import concurrent.futures
SPEC_DICT = loads(
"""
{
"observation_id": 12345,
"stop_time": "2106-02-07T00:00:00",
"antenna_set": "ALL",
"antenna_mask": [0,1,2,9],
"filter": "HBA_110_190",
"SAPs": [{
......
......@@ -4,6 +4,7 @@
from os.path import dirname, isfile
from tempfile import TemporaryDirectory
from unittest import mock
from typing import Tuple
import sys
import h5py
import numpy
......@@ -18,6 +19,7 @@ from tests.test_devices import (
FakeAntennaFieldDeviceProxy,
FakeOffAntennaFieldDeviceProxy,
FakeDigitalBeamDeviceProxy,
FakeStationManagerDeviceProxy,
)
from tests import base
......@@ -31,6 +33,8 @@ class TestStatisticsReaderWriter(base.TestCase):
return FakeAntennaFieldDeviceProxy(device_name)
if device_name == "STAT/DigitalBeam/HBA":
return FakeDigitalBeamDeviceProxy(device_name)
if device_name == "STAT/StationManager/1":
return FakeStationManagerDeviceProxy
raise ValueError(
f"Device not mocked, and thus not available in this test: {device_name}"
)
......@@ -39,16 +43,19 @@ class TestStatisticsReaderWriter(base.TestCase):
"""Return our mocked DeviceProxies that simulate a device that is off"""
if device_name == "STAT/AntennaField/LBA":
return FakeOffAntennaFieldDeviceProxy(device_name)
if device_name == "STAT/StationManager/1":
return FakeStationManagerDeviceProxy
raise ValueError(
f"Device not mocked, and thus not available in this test: {device_name}"
)
class TestStatisticsReaderWriterSST(TestStatisticsReaderWriter):
"""TestStatistics class for SST-mode"""
def _run_writer_reader(
self, tmpdir: str, writer_argv: list
) -> (StatisticsData, StatisticsFileHeader):
) -> Tuple[StatisticsData, StatisticsFileHeader]:
"""Run the statistics writer with the given arguments,
and read and return the output."""
# default arguments for statistics writer
......@@ -97,6 +104,7 @@ class TestStatisticsReaderWriterSST(TestStatisticsReaderWriter):
def test_header_info(self):
"""Test whether the header info are inserted and collected in the proper way"""
with mock.patch.object(entry, "_get_tango_device", self._mock_get_tango_device):
with TemporaryDirectory() as tmpdir:
_, file_header = self._run_writer_reader(tmpdir, [])
......@@ -144,11 +152,27 @@ class TestStatisticsReaderWriterSST(TestStatisticsReaderWriter):
):
_ = self._run_writer_reader(tmpdir, writer_argv)
def test_station_name(self):
with TemporaryDirectory() as tmpdir:
writer_argv = [
"--stationmanager",
"1",
]
with mock.patch.object(
entry, "_get_tango_device", self._mock_get_tango_device
):
_, file_header = self._run_writer_reader(tmpdir, writer_argv)
self.assertEqual("DevStation", file_header.station_name)
class TestStatisticsReaderWriterBST(TestStatisticsReaderWriter):
"""TestStatistics class for BST-mode"""
def _run_writer_reader(
self, tmpdir: str, writer_argv: list
) -> (StatisticsData, StatisticsFileHeader):
) -> Tuple[StatisticsData, StatisticsFileHeader]:
"""Run the statistics writer with the given arguments,
and read and return the output."""
# default arguments for statistics writer
......@@ -206,9 +230,31 @@ class TestStatisticsReaderWriterBST(TestStatisticsReaderWriter):
# Test some AntennField attributes, whether they match our mock
self.assertListEqual(list(range(0, 488)), file_header.subbands.tolist())
def test_bst(self):
with mock.patch.object(entry, "_get_tango_device", self._mock_get_tango_device):
with TemporaryDirectory() as tmpdir:
new_sys_argv = [
sys.argv[0],
"--mode",
"BST",
"--file",
dirname(__file__) + "/SDP_BST_statistics_packets.bin",
"--output_dir",
tmpdir,
]
with mock.patch.object(entry.sys, "argv", new_sys_argv):
with self.assertRaises(SystemExit):
entry.main()
# check if file was written
self.assertTrue(isfile(f"{tmpdir}/BST_2022-05-20-11-08-44.h5"))
class TestStatisticsWriterXST(TestStatisticsReaderWriter):
"""TestStatistics class for XST-mode"""
class TestStatisticsWriter(base.TestCase):
def test_xst(self):
with mock.patch.object(entry, "_get_tango_device", self._mock_get_tango_device):
with TemporaryDirectory() as tmpdir:
new_sys_argv = [
sys.argv[0],
......@@ -242,8 +288,8 @@ class TestStatisticsWriter(base.TestCase):
self.assertIn("XST_2021-09-13T13:21:36.999+00:00", dict(f.items()))
self.assertIn("XST_2021-09-13T13:21:38.000+00:00", dict(f.items()))
# check dataset dimensions, should match the maximum number of antennas,
# and cover both polarisations.
# check dataset dimensions, should match the maximum
# number of antennas, and cover both polarisations.
self.assertEqual(
(192, 192), f["XST_2021-09-13T13:21:32.000+00:00"].shape
)
......@@ -254,13 +300,15 @@ class TestStatisticsWriter(base.TestCase):
)
def test_xst_multiple_subbands(self):
with mock.patch.object(entry, "_get_tango_device", self._mock_get_tango_device):
with TemporaryDirectory() as tmpdir:
new_sys_argv = [
sys.argv[0],
"--mode",
"XST",
"--file",
dirname(__file__) + "/SDP_XST_statistics_packets_multiple_subbands.bin",
dirname(__file__)
+ "/SDP_XST_statistics_packets_multiple_subbands.bin",
"--output_dir",
tmpdir,
]
......@@ -305,6 +353,8 @@ class TestStatisticsWriter(base.TestCase):
self.assertIn("rcu_band_select", dict(f.attrs))
self.assertIn("rcu_dth_on", dict(f.attrs))
self.assertIn("rcu_dth_freq", dict(f.attrs))
self.assertIn("rcu_pcb_id", dict(f.attrs))
self.assertIn("rcu_pcb_version", dict(f.attrs))
# check dataset dimensions, should match the number of antennas,
# and cover both polarisations.
......@@ -312,24 +362,6 @@ class TestStatisticsWriter(base.TestCase):
(6, 6), f["XST_2021-09-13T13:21:32.000+00:00"].shape
)
def test_bst(self):
with TemporaryDirectory() as tmpdir:
new_sys_argv = [
sys.argv[0],
"--mode",
"BST",
"--file",
dirname(__file__) + "/SDP_BST_statistics_packets.bin",
"--output_dir",
tmpdir,
]
with mock.patch.object(entry.sys, "argv", new_sys_argv):
with self.assertRaises(SystemExit):
entry.main()
# check if file was written
self.assertTrue(isfile(f"{tmpdir}/BST_2022-05-20-11-08-44.h5"))
class TestDictToHdf5Attrs(base.TestCase):
def test_empty_dict(self):
......
......@@ -285,6 +285,8 @@ class FakeAntennaFieldDeviceProxy:
RCU_band_select_R = [1] * 3
RCU_DTH_on_R = [False] * 3
RCU_DTH_freq_R = [0.0] * 3
RCU_PCB_ID_R = [[1, 1]] * 48
RCU_PCB_version_R = [["version", "version"]] * 48
HBAT_PWR_on_R = []
Frequency_Band_R = []
......@@ -323,3 +325,18 @@ class FakeDigitalBeamDeviceProxy:
def __getattr__(self, attrname):
return getattr(self, attrname)
class FakeStationManagerDeviceProxy:
"""DeviceProxy that mocks access to a StationManager device."""
station_name_R = "DevStation"
def __init__(self, name):
self._name = name
def name(self):
return self._name
def __getattr__(self, attrname):
return getattr(self, attrname)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment