Skip to content
Snippets Groups Projects
Commit 9229637b authored by Jan David Mol's avatar Jan David Mol
Browse files

L2SS-1451: Infer device names and default port number from the specified antenna field

parent e45a492a
No related branches found
No related tags found
1 merge request!55L2SS-1451: Infer device names and default port number from the specified antenna field
......@@ -105,6 +105,7 @@ tox -e debug tests.requests.test_prometheus
```
## Releasenotes
- 0.15.1 - Infer device names and default port from the specified antenna-field name
- 0.15.0 - Add all_connected method to MultiStationObservation
- 0.14.8 - Optimizing hdf dictionary writing to cause less memory and IO overhead
- 0.14.7 - Refactor LazyDeviceProxy
......
0.15.0
0.15.1
......@@ -19,9 +19,15 @@
# number of polarisations per antenna (X and y polarisations)
N_pol = 2
# maximum number of fpgas supported by SDP
N_pn = 16
# antennas per FPGA
A_pn = 6
# signal inputs per FPGA
S_pn = A_pn * N_pol
RCU2L = [0, 1, 2, 3, 4, 5]
RCU2H = [8, 9]
RCU2HPWR = [8] # Should be even numbers
......
......@@ -105,41 +105,9 @@ def _create_parser():
"-A",
"--antennafield",
type=str,
choices=["", "LBA", "HBA", "HBA0", "HBA1"],
default="",
help="Antenna field to collect data for",
)
parser.add_argument(
"-S",
"--sdp",
type=str,
choices=["", "1"],
default="",
help="SDP device to collect data for",
)
parser.add_argument(
"-TB",
"--tilebeam",
type=str,
choices=["", "LBA", "HBA", "HBA0", "HBA1"],
default="",
help="Tilebeam device to collect data for",
)
parser.add_argument(
"-DB",
"--digitalbeam",
type=str,
choices=["", "LBA", "HBA", "HBA0", "HBA1"],
default="",
help="Digitalbeam device to collect data for",
)
parser.add_argument(
"-SM",
"--stationmanager",
type=str,
choices=["", "1"],
default="1",
help="StationManager device to collect data for",
choices=["LBA", "HBA", "HBA0", "HBA1"],
default="LBA",
help="Antenna field to collect data for (default: %(default))",
)
return parser
......@@ -266,8 +234,13 @@ def main():
)
if port == 0:
default_ports = {"SST": 5101, "XST": 5102, "BST": 5103}
port = default_ports[mode]
default_ports = {
"LBA": {"SST": 5101, "XST": 5102, "BST": 5103},
"HBA": {"SST": 5111, "XST": 5112, "BST": 5113},
"HBA0": {"SST": 5111, "XST": 5112, "BST": 5113},
"HBA1": {"SST": 5121, "XST": 5122, "BST": 5123},
}
port = default_ports[args.antennafield or "LBA"][mode]
if debug:
logger.setLevel(logging.DEBUG)
......@@ -294,25 +267,24 @@ def main():
device with statistics read from disk."
)
if args.sdp:
sdp_device = _get_tango_device(tango_disabled, host, f"STAT/SDP/{args.sdp}")
devices["sdp"] = sdp_device
if args.tilebeam:
if args.antennafield != "LBA":
tilebeam_device = _get_tango_device(
tango_disabled, host, f"STAT/TileBeam/{args.tilebeam}"
tango_disabled, host, f"STAT/TileBeam/{args.antennafield}"
)
devices["tilebeam"] = tilebeam_device
if args.digitalbeam:
sdp_device = _get_tango_device(
tango_disabled, host, f"STAT/SDP/{args.antennafield}"
)
devices["sdp"] = sdp_device
digitalbeam_device = _get_tango_device(
tango_disabled, host, f"STAT/DigitalBeam/{args.digitalbeam}"
tango_disabled, host, f"STAT/DigitalBeam/{args.antennafield}"
)
devices["digitalbeam"] = digitalbeam_device
if args.stationmanager:
stationmanager_device = _get_tango_device(
tango_disabled, host, f"STAT/StationManager/{args.stationmanager}"
tango_disabled, host, "STAT/StationManager/1"
)
devices["stationmanager"] = stationmanager_device
......
......@@ -19,7 +19,7 @@ import numpy
import pytz
from tango import DeviceProxy, DevFailed
from lofar_station_client.dts.constants import A_pn, N_pol
from lofar_station_client.dts.constants import A_pn, N_pol, CST_N_SUB
from lofar_station_client.file_access.hdf._hdf_writers import HdfFileWriter, create_hdf5
from lofar_station_client.statistics import writer as stats_writer
from lofar_station_client.statistics.collector import BSTCollector
......@@ -386,7 +386,7 @@ class HDF5Writer(ABC):
"Failed to read from %s", self.antennafield_device.name()
)
if self.sdp_device:
if self.sdp_device and self.antenna_sdp_mapping:
try:
nyquist_zones = self.sdp_device.nyquist_zone_RW
fpga_spectral_inversion = self.sdp_device.FPGA_spectral_inversion_R
......@@ -403,7 +403,7 @@ class HDF5Writer(ABC):
(len(self.antenna_sdp_mapping), N_pol), None
)
matrix.subband_frequencies = numpy.empty(
(len(self.antenna_sdp_mapping), N_pol), None
(len(self.antenna_sdp_mapping), N_pol, CST_N_SUB), None
)
for antenna_nr, (fpga_nr, input_nr) in enumerate(
self.antenna_sdp_mapping
......
......@@ -9,6 +9,7 @@ import sys
import h5py
import numpy
from lofar_station_client.dts.constants import N_pol
from lofar_station_client.statistics.statistics_data import (
StatisticsData,
StatisticsFileHeader,
......@@ -19,6 +20,7 @@ from tests.test_devices import (
FakeAntennaFieldDeviceProxy,
FakeOffAntennaFieldDeviceProxy,
FakeDigitalBeamDeviceProxy,
FakeSDPDeviceProxy,
FakeStationManagerDeviceProxy,
)
from tests import base
......@@ -31,10 +33,12 @@ class TestStatisticsReaderWriter(base.TestCase):
"""Return our mocked DeviceProxies"""
if device_name == "STAT/AntennaField/LBA":
return FakeAntennaFieldDeviceProxy(device_name)
if device_name == "STAT/DigitalBeam/HBA":
if device_name == "STAT/DigitalBeam/LBA":
return FakeDigitalBeamDeviceProxy(device_name)
if device_name == "STAT/StationManager/1":
return FakeStationManagerDeviceProxy
if device_name == "STAT/SDP/LBA":
return FakeSDPDeviceProxy(device_name)
raise ValueError(
f"Device not mocked, and thus not available in this test: {device_name}"
)
......@@ -43,8 +47,12 @@ class TestStatisticsReaderWriter(base.TestCase):
"""Return our mocked DeviceProxies that simulate a device that is off"""
if device_name == "STAT/AntennaField/LBA":
return FakeOffAntennaFieldDeviceProxy(device_name)
if device_name == "STAT/DigitalBeam/LBA":
return FakeDigitalBeamDeviceProxy(device_name)
if device_name == "STAT/StationManager/1":
return FakeStationManagerDeviceProxy
if device_name == "STAT/SDP/LBA":
return FakeSDPDeviceProxy(device_name)
raise ValueError(
f"Device not mocked, and thus not available in this test: {device_name}"
)
......@@ -154,10 +162,7 @@ class TestStatisticsReaderWriterSST(TestStatisticsReaderWriter):
def test_station_name(self):
with TemporaryDirectory() as tmpdir:
writer_argv = [
"--stationmanager",
"1",
]
writer_argv = []
with mock.patch.object(
entry, "_get_tango_device", self._mock_get_tango_device
......@@ -218,8 +223,8 @@ class TestStatisticsReaderWriterBST(TestStatisticsReaderWriter):
def test_insert_tango_BST_statistics(self):
with TemporaryDirectory() as tmpdir:
writer_argv = [
"--digitalbeam",
"HBA",
"--antennafield",
"LBA",
]
with mock.patch.object(
......@@ -288,10 +293,12 @@ class TestStatisticsWriterXST(TestStatisticsReaderWriter):
self.assertIn("XST_2021-09-13T13:21:36.999+00:00", dict(f.items()))
self.assertIn("XST_2021-09-13T13:21:38.000+00:00", dict(f.items()))
# check dataset dimensions, should match the maximum
# check dataset dimensions, should match the actual
# number of antennas, and cover both polarisations.
nr_antennas = FakeAntennaFieldDeviceProxy.nr_antennas_R
self.assertEqual(
(192, 192), f["XST_2021-09-13T13:21:32.000+00:00"].shape
(nr_antennas * N_pol, nr_antennas * N_pol),
f["XST_2021-09-13T13:21:32.000+00:00"].shape,
)
# check dataset header
......@@ -322,7 +329,9 @@ class TestStatisticsWriterXST(TestStatisticsReaderWriter):
def test_xst_with_antennafield(self):
with TemporaryDirectory() as tmpdir:
with mock.patch.object(entry, "DeviceProxy", FakeAntennaFieldDeviceProxy):
with mock.patch.object(
entry, "_get_tango_device", self._mock_get_tango_device
):
new_sys_argv = [
sys.argv[0],
"--mode",
......
......@@ -11,6 +11,7 @@ from tango.test_context import MultiDeviceTestContext
import lofar_station_client
from lofar_station_client import devices
from lofar_station_client.devices import LofarDeviceProxy
from lofar_station_client.dts.constants import N_pn, S_pn, CST_N_SUB, CST_FS
from tests import base
......@@ -282,17 +283,18 @@ class RecvDeviceProxyTest(base.TestCase):
class FakeAntennaFieldDeviceProxy:
"""DeviceProxy that mocks access to an AntennaField device."""
nr_antennas_R = 3
Antenna_to_SDP_Mapping_R = [[0, 0], [0, 1], [0, 2]]
Antenna_Names_R = ["Aap", "Noot", "Mies"]
Antenna_Reference_ITRF_R = [[0, 0, 0]] * 3
Antenna_Quality_str_R = ["OK"] * 3
Antenna_Reference_ITRF_R = [[0, 0, 0]] * nr_antennas_R
Antenna_Quality_str_R = ["OK"] * nr_antennas_R
Antenna_Type_R = "LBA"
Antenna_Usage_Mask_R = [True] * 3
Frequency_Band_RW = ["LBA_10_90"] * 3
Antenna_Usage_Mask_R = [True] * nr_antennas_R
Frequency_Band_RW = ["LBA_10_90"] * nr_antennas_R
RCU_attenuator_dB_R = [0, 1, 2]
RCU_band_select_R = [1] * 3
RCU_DTH_on_R = [False] * 3
RCU_DTH_freq_R = [0.0] * 3
RCU_band_select_R = [1] * nr_antennas_R
RCU_DTH_on_R = [False] * nr_antennas_R
RCU_DTH_freq_R = [0.0] * nr_antennas_R
RCU_PCB_ID_R = [[1, 1]] * 48
RCU_PCB_version_R = [["version", "version"]] * 48
HBAT_PWR_on_R = []
......@@ -318,6 +320,28 @@ class FakeOffAntennaFieldDeviceProxy:
raise DevFailed("Device is off")
class FakeSDPDeviceProxy:
"""DeviceProxy that mocks access to an DigitalBeam device."""
FPGA_firmware_version_R = ["firmware version"] * N_pn
FPGA_hardware_version_R = ["hardware version"] * N_pn
nyquist_zone_RW = numpy.array([[0] * S_pn] * N_pn, dtype=numpy.int64)
FPGA_spectral_inversion_R = numpy.array([[False] * S_pn] * N_pn, dtype=bool)
subband_frequency_R = numpy.array(
[[sb * CST_FS / CST_N_SUB for sb in range(CST_N_SUB)]] * (S_pn * N_pn),
dtype=numpy.float64,
)
def __init__(self, name):
self._name = name
def name(self):
return self._name
def __getattr__(self, attrname):
return getattr(self, attrname)
class FakeDigitalBeamDeviceProxy:
"""DeviceProxy that mocks access to an DigitalBeam device."""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment