Skip to content
Snippets Groups Projects
Commit abe197d4 authored by Corné Lukken's avatar Corné Lukken Committed by Hannes Feldt
Browse files

L2SS-1740: Move modules to prepare for statistics metadata

parent 21bc5543
No related branches found
No related tags found
1 merge request!89L2SS-1740: Move modules to prepare for statistics metadata
Showing
with 391 additions and 326 deletions
......@@ -82,18 +82,9 @@ Suppose you captured statistics or beamlets in a file called `packets.raw`. You
and print a brief summary per packet using:
```python
from lofar_station_client.statistics.receiver import FileReceiver
from lofar_station_client.statistics import receivers
for packet in FileReceiver("packets.raw"):
print(packet)
```
You can also process them live from a station, for example:
```python
from lofar_station_client.statistics.receiver import TCPReceiver
for packet in TCPReceiver("cs001c.control.lofar", 5101):
for packet in receivers.create("file:///packets.raw"):
print(packet)
```
......@@ -131,6 +122,7 @@ tox -e debug tests.requests.test_prometheus
## Release notes
- 0.18.7 - Add support for various ZeroMQ package receivers
- 0.18.6 - Compatability with new black versions
- 0.18.5 - Compatability with python 3.10 and higher
- 0.18.4 - Compatability with PyTango 9.5.0
......
0.18.6
0.18.7
# Copyright (C) 2022 ASTRON (Netherlands Institute for Radio Astronomy)
# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
# pylint: disable-all
......@@ -8,32 +8,6 @@
Contains required methods missing in older python versions.
"""
try:
# Python >=3.8 should have these functions already
from typing import get_args
from typing import get_origin
except ImportError:
import typing
import collections
def get_origin(tp):
"""Copied from the Python 3.8 typing module"""
if isinstance(tp, typing._GenericAlias):
return tp.__origin__
if tp is typing.Generic:
return typing.Generic
return None
def get_args(tp):
"""Copied from the Python 3.8 typing module"""
if isinstance(tp, typing._GenericAlias):
res = tp.__args__
if get_origin(tp) is collections.abc.Callable and res[0] is not Ellipsis:
res = (list(res[:-1]), res[-1])
return res
return ()
try:
from inspect import get_annotations
except ImportError:
......
# Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy)
# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
"""
General utils
"""
from typing import Optional, Type, get_type_hints
from typing import Optional, Type, get_type_hints, get_args, get_origin
from numpy import ndarray
from ._compat_utils import get_args, get_origin
from ._monitoring import MonitoredWrapper
......
# Copyright (C) 2022 ASTRON (Netherlands Institute for Radio Astronomy)
# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
"""
......@@ -6,11 +6,11 @@ Utils to handle transformation of HDF5 specific classes to pythonic objects
"""
import inspect
from collections.abc import MutableMapping
from typing import Type, TypeVar
from typing import Type, TypeVar, get_origin
from numpy import ndarray
from .._compat_utils import get_origin, get_annotations
from .._compat_utils import get_annotations
T = TypeVar("T")
......
# Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy)
# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
"""
......@@ -7,7 +7,7 @@ Contains classes to handle file reading
import inspect
import weakref
from inspect import getattr_static
from typing import TypeVar, Type, List, Dict
from typing import TypeVar, Type, List, Dict, get_origin
import h5py
from numpy import ndarray, zeros
......@@ -17,7 +17,6 @@ from ._hdf5_utils import (
_assert_is_dataset,
)
from .._attribute_def import AttributeDef
from .._compat_utils import get_origin
from .._lazy_dict import lazy_dict
from .._member_def import MemberDef
from .._readers import FileReader, DataReader
......@@ -169,7 +168,7 @@ class HdfDataReader(DataReader):
reader = cls.detect_reader(target_type, data_reader)
result = lazy_dict(dict_type, lambda k: reader(value[k]))
for k in value.keys():
result[k] = lambda k=k: reader(value[k])
result[k] = lambda n=k: reader(value[n])
if dict_type is not dict:
setattr(result, "_data_reader", cls(data_reader.file_reader, value))
return result
......
# Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy)
# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
"""
Contains classes to handle file writing
"""
from inspect import getattr_static
from typing import TypeVar, Type, Dict
from typing import TypeVar, Type, Dict, get_origin
import h5py
from numpy import ndarray
......@@ -17,7 +17,6 @@ from ._hdf5_utils import (
_assert_is_dataset,
)
from ._hdf_readers import HdfFileReader, HdfDataReader
from .._compat_utils import get_origin
from .._lazy_dict import LazyDict
from .._utils import _wrap, _extract_base_type
from .._writers import FileWriter, DataWriter
......
......@@ -101,7 +101,7 @@ class StationObservationFuture:
if not self.connected:
return self._failed_future()
return self._control_proxy.stop_observation(self._id, wait=False)
return self._control_proxy.stop_observation_now(self._id, wait=False)
@property
def is_running(self) -> concurrent.futures:
......
......@@ -34,7 +34,7 @@ An example call could be:
```l2ss-statistics-writer --host localhost --port 1234--mode XST --debug```
This starts the script up to listen on localhost:1234 for XSTs with debug mode
on.
on.
## HFD5 structure
Statistics packets are collected by the StatisticsCollector in to a matrix. Once
......@@ -62,7 +62,7 @@ File
### reader
There is a statistics reader that is capable of parsing multiple HDF5 statistics
files in to a more easily usable format. It also allows for filtering between
files in to a more easily usable format. It also allows for filtering between
certain timestamps.
`statistics_reader.py` takes the following arguments:
......@@ -73,11 +73,6 @@ certain timestamps.
ex: `python3 statistics_reader.py --files SST_2021-10-04-07-36-52.h5 --end_time 2021-10-04#07:50:08.937+00:00`
This will parse all the statistics in the file `SST_2021-10-04-07-36-52.h5` up to the timestamp `2021-10-04#07:50:08.937+00:00`
This file can be used as both a testing tool and an example for dealing with HDF5 statistics.
This file can be used as both a testing tool and an example for dealing with HDF5 statistics.
The code serves can serve as a starting point for further development. To help with these purposes a bunch of simple
helper functions are provided.
### test server
There is a test server that will continuously send out the same statistics packet.
Its called `test_server.py`. Takes `--host`, `--port` and `--file` as optional input arguments.
Defaults to address `'127.0.0.1'`, port `65433` and file `devices_test_SDP_SST_statistics_packets.bin`
# Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
"""Collector classes for statistics"""
from ._bst import BSTCollector
from ._collector import StatisticsCollector
from ._sst import SSTCollector
from ._xst import XSTCollector
__all__ = ["BSTCollector", "StatisticsCollector", "SSTCollector", "XSTCollector"]
# Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
"""BST statistics collector"""
# logging use & lazy string, (too-few-public-methods)
# pylint: disable=W1203, R0903
import logging
import numpy
from lofar_station_client.dts.constants import N_pol
from lofar_station_client.statistics.packets import BSTPacket
from lofar_station_client.statistics.collectors._collector import StatisticsCollector
logger = logging.getLogger()
class BSTCollector(StatisticsCollector):
"""Class to process SST statistics packets."""
# beamlets = 488 * 2 for the x and y polarisations
MAX_BEAMLETS = 488
def _default_parameters(self):
defaults = super()._default_parameters()
defaults.update(
{
# Last value array we've constructed out of the packets
"bst_values": numpy.zeros(
(self.MAX_BEAMLETS, N_pol),
dtype=numpy.uint64,
),
"bst_timestamps": numpy.zeros(
(self.MAX_FPGAS,),
dtype=numpy.float64,
),
"integration_intervals": numpy.zeros(
(self.MAX_FPGAS,),
dtype=numpy.float32,
),
}
)
return defaults
def _parse_packet(self, packet: BSTPacket):
fpga_nr = self.gn_index_to_fpga_nr(packet.header.gn_index)
# number of beamlets in the packet
beamlets = packet.payload()
nr_beamlets = beamlets.shape[0]
first_beamlet = packet.header.data_id.beamlet_index
last_beamlet = first_beamlet + nr_beamlets
# determine which input this packet contains data for
if last_beamlet > self.MAX_BEAMLETS:
# packet describes an input that is out of bounds for us
raise ValueError(
f"Packet describes {nr_beamlets} beamlets starting at "
f"{packet.header.data_id.beamlet_index}, but we are limited "
f"to describing MAX_BEAMLETS={self.MAX_BEAMLETS}"
)
if packet.header.payload_error:
# cannot trust the data if a payload error is reported
self.parameters["nof_payload_errors"][fpga_nr] += numpy.uint64(1)
# don't raise, as packet is valid
return
# process the packet
self.parameters["nof_valid_payloads"][fpga_nr] += numpy.uint64(1)
self.parameters["bst_values"][first_beamlet:last_beamlet] = beamlets
self.parameters["bst_timestamps"][fpga_nr] = numpy.float64(
packet.timestamp.timestamp()
)
self.parameters["integration_intervals"][
fpga_nr
] = packet.header.integration_interval
# Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
"""Base statistic collector class for all types of statistics
This includes:
XSTCollector
BSTCollector
SSTCollector
"""
# logging use & lazy string, (too-few-public-methods)
# pylint: disable=W1203, R0903
import abc
import logging
import numpy
from lofar_station_client.statistics.packets import StatisticsPacket
logger = logging.getLogger()
class StatisticsCollector(abc.ABC):
"""Base class to process statistics packets into parameter matrices."""
# Maximum number of FPGAs we receive data from (used for diagnostics)
MAX_FPGAS = 16
def __init__(self):
self.parameters = self._default_parameters()
def _default_parameters(self):
"""Default parameters that all types of statistics contain"""
return {
"nof_packets": numpy.uint64(0),
# Packet count for packets that could not be parsed
"nof_invalid_packets": numpy.uint64(0),
# Number of packets received so far that we could parse correctly
# and do not have a payload error
"nof_valid_payloads": numpy.zeros((self.MAX_FPGAS,), dtype=numpy.uint64),
# Packets that reported a payload error
"nof_payload_errors": numpy.zeros((self.MAX_FPGAS,), dtype=numpy.uint64),
# Full contents of the latest packet we deemed invalid.
"last_invalid_packet": numpy.zeros((9000,), dtype=numpy.uint8),
# Track the last invalid packet
"last_invalid_packet_exception": "None",
# gn_index of FPGAs as reported in the packets, for verification purposes
"gn_indices": numpy.full((self.MAX_FPGAS,), -1, dtype=numpy.int16),
}
def gn_index_to_fpga_nr(self, gn_index: int) -> int:
"""Register an FPGA to the list of encountered gn_indices.
The FPGAs can carry any gn_index 0..255 but we only maintain
statistics for MAX_FPGAS of them. We therefor maintain a mapping
of which gn_indices we have encountered.
Returns the index of the registered gn_index within the list.
Throws a ValueError if more than MAX_FPGAS are encountered."""
def find_fpga_nr(gni):
indices = numpy.where(self.parameters["gn_indices"] == gni)[0]
if len(indices):
return indices[0]
raise ValueError(f"Could not find gn_index {gni}")
try:
return find_fpga_nr(gn_index)
except ValueError:
# FPGA not yet in the list
try:
new_fpga_nr = find_fpga_nr(-1)
except ValueError as ex:
# No room anymore! Throw exception
raise ValueError(
f"Cannot register data coming from FPGA {gn_index}, as "
f"the administration is filled with data from FPGAs "
f"{self.parameters['gn_indices']}"
) from ex
# Add this FPGA to the list
self.parameters["gn_indices"][new_fpga_nr] = gn_index
return new_fpga_nr
def process_packet(self, packet: StatisticsPacket):
"""Baseclass wrapper around performing parse_packet"""
self.parameters["nof_packets"] += numpy.uint64(1)
try:
self._parse_packet(packet)
except Exception as err:
self.parameters["last_invalid_packet"] = numpy.frombuffer(
packet.raw, dtype=numpy.uint8
)
self.parameters["nof_invalid_packets"] += numpy.uint64(1)
self.parameters["last_invalid_packet_exception"] = str(err)
raise ValueError("Could not parse statistics packet") from err
@abc.abstractmethod
def _parse_packet(self, packet: StatisticsPacket):
"""Update any information based on this packet."""
raise NotImplementedError
# Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
"""Collectors for all types of statistics
This includes:
XSTCollector
BSTCollector
SSTCollector
"""
# logging use & lazy string, (too-few-public-methods)
# pylint: disable=W1203, R0903
import logging
import numpy
from lofar_station_client.statistics.collectors._collector import StatisticsCollector
from lofar_station_client.statistics.packets import SSTPacket
logger = logging.getLogger()
class SSTCollector(StatisticsCollector):
"""Class to process SST statistics packets."""
# Maximum number of antenna inputs we support (used to determine array sizes)
MAX_INPUTS = 192
# Maximum number of subbands we support (used to determine array sizes)
MAX_SUBBANDS = 512
def __init__(
self, nr_signal_inputs: int = MAX_INPUTS, first_signal_input_index: int = 0
):
self.nr_signal_inputs = nr_signal_inputs
self.first_signal_input_index = first_signal_input_index
super().__init__()
def _default_parameters(self):
defaults = super()._default_parameters()
defaults.update(
{
# Last value array we've constructed out of the packets
"sst_values": numpy.zeros(
(self.nr_signal_inputs, self.MAX_SUBBANDS), dtype=numpy.uint64
),
"sst_timestamps": numpy.zeros(
(self.nr_signal_inputs,), dtype=numpy.float64
),
"integration_intervals": numpy.zeros(
(self.nr_signal_inputs,), dtype=numpy.float32
),
"subbands_calibrated": numpy.zeros(
(self.nr_signal_inputs,), dtype=bool
),
}
)
return defaults
def _parse_packet(self, packet: SSTPacket):
if not isinstance(packet, SSTPacket):
raise ValueError("Packet is not of type SSTPacket")
fpga_nr = self.gn_index_to_fpga_nr(packet.header.gn_index)
input_index = (
packet.header.data_id.signal_input_index - self.first_signal_input_index
)
# determine which input this packet contains data for
if not 0 <= input_index < self.nr_signal_inputs:
# packet describes an input that is out of bounds for us
raise ValueError(
f"Packet describes input {packet.header.data_id.signal_input_index}, "
f"but we are limited to describing {self.nr_signal_inputs} starting "
f"at index {self.first_signal_input_index}"
)
if packet.header.payload_error:
# cannot trust the data if a payload error is reported
self.parameters["nof_payload_errors"][fpga_nr] += numpy.uint64(1)
# don't raise, as packet is valid
return
# process the packet
self.parameters["nof_valid_payloads"][fpga_nr] += numpy.uint64(1)
self.parameters["sst_values"][input_index][
: packet.header.nof_statistics_per_packet
] = packet.payload()
self.parameters["sst_timestamps"][input_index] = numpy.float64(
packet.timestamp.timestamp()
)
self.parameters["integration_intervals"][
input_index
] = packet.header.integration_interval
self.parameters["subbands_calibrated"][
input_index
] = packet.header.subband_calibrated_flag
......@@ -12,199 +12,21 @@ This includes:
# logging use & lazy string, (too-few-public-methods)
# pylint: disable=W1203, R0903
import abc
import logging
import numpy
from lofar_station_client.dts.constants import N_pol
from lofar_station_client.math.baseline import baseline_from_index
from lofar_station_client.math.baseline import baseline_index
# TODO(Corne): Discuss moving to lofar_common_python library?
from lofar_station_client.math.baseline import nr_baselines
from lofar_station_client.statistics.packets import BSTPacket, StatisticsPacket
from lofar_station_client.statistics.packets import SSTPacket
from lofar_station_client.statistics.collectors._collector import StatisticsCollector
from lofar_station_client.statistics.packets import XSTPacket
logger = logging.getLogger()
class StatisticsCollector(abc.ABC):
"""Base class to process statistics packets into parameters matrices."""
# Maximum number of FPGAs we receive data from (used for diagnostics)
MAX_FPGAS = 16
def __init__(self):
self.parameters = self._default_parameters()
def _default_parameters(self):
"""Default parameters that all types of statistics contain"""
return {
"nof_packets": numpy.uint64(0),
# Packet count for packets that could not be parsed
"nof_invalid_packets": numpy.uint64(0),
# Full contents of the latest packet we deemed invalid.
"last_invalid_packet": numpy.zeros((9000,), dtype=numpy.uint8),
# Track the last invalid packet
"last_invalid_packet_exception": "None",
# gn_index of FPGAs as reported in the packets, for verification purposes
"gn_indices": numpy.full((self.MAX_FPGAS,), -1, dtype=numpy.int16),
}
def gn_index_to_fpga_nr(self, gn_index: int) -> int:
"""Register an FPGA to the list of encountered gn_indices.
The FPGAs can carry any gn_index 0..255 but we only maintain
statistics for MAX_FPGAS of them. We therefor maintain a mapping
of which gn_indices we have encountered.
Returns the index of the registered gn_index within the list.
Throws a ValueError if more than MAX_FPGAS are encountered."""
def find_fpga_nr(gni):
indices = numpy.where(self.parameters["gn_indices"] == gni)[0]
if len(indices):
return indices[0]
raise ValueError(f"Could not find gn_index {gni}")
try:
return find_fpga_nr(gn_index)
except ValueError:
# FPGA not yet in the list
try:
new_fpga_nr = find_fpga_nr(-1)
except ValueError as ex:
# No room anymore! Throw exception
raise ValueError(
f"Cannot register data coming from FPGA {gn_index}, as "
f"the administration is filled with data from FPGAs "
f"{self.parameters['gn_indices']}"
) from ex
# Add this FPGA to the list
self.parameters["gn_indices"][new_fpga_nr] = gn_index
return new_fpga_nr
def process_packet(self, packet: StatisticsPacket):
"""Baseclass wrapper around performing parse_packet"""
self.parameters["nof_packets"] += numpy.uint64(1)
try:
self._parse_packet(packet)
except Exception as err:
self.parameters["last_invalid_packet"] = numpy.frombuffer(
packet.raw, dtype=numpy.uint8
)
self.parameters["nof_invalid_packets"] += numpy.uint64(1)
self.parameters["last_invalid_packet_exception"] = str(err)
raise ValueError("Could not parse statistics packet") from err
@abc.abstractmethod
def _parse_packet(self, packet: StatisticsPacket):
"""Update any information based on this packet."""
raise NotImplementedError
class SSTCollector(StatisticsCollector):
"""Class to process SST statistics packets."""
# Maximum number of antenna inputs we support (used to determine array sizes)
MAX_INPUTS = 192
# Maximum number of subbands we support (used to determine array sizes)
MAX_SUBBANDS = 512
def __init__(
self, nr_signal_inputs: int = MAX_INPUTS, first_signal_input_index: int = 0
):
self.nr_signal_inputs = nr_signal_inputs
self.first_signal_input_index = first_signal_input_index
super().__init__()
def _default_parameters(self):
defaults = super()._default_parameters()
defaults.update(
{
# Number of packets received so far that we could parse correctly and do
# not have a payload error
"nof_valid_payloads": numpy.zeros(
(self.MAX_FPGAS,), dtype=numpy.uint64
),
# Packets that reported a payload error
"nof_payload_errors": numpy.zeros(
(self.MAX_FPGAS,), dtype=numpy.uint64
),
# Last value array we've constructed out of the packets
"sst_values": numpy.zeros(
(self.nr_signal_inputs, self.MAX_SUBBANDS), dtype=numpy.uint64
),
"sst_timestamps": numpy.zeros(
(self.nr_signal_inputs,), dtype=numpy.float64
),
"integration_intervals": numpy.zeros(
(self.nr_signal_inputs,), dtype=numpy.float32
),
"subbands_calibrated": numpy.zeros(
(self.nr_signal_inputs,), dtype=bool
),
}
)
return defaults
def _parse_packet(self, packet: SSTPacket):
if not isinstance(packet, SSTPacket):
raise ValueError("Packet is not of type SSTPacket")
fpga_nr = self.gn_index_to_fpga_nr(packet.header.gn_index)
input_index = (
packet.header.data_id.signal_input_index - self.first_signal_input_index
)
# determine which input this packet contains data for
if not 0 <= input_index < self.nr_signal_inputs:
# packet describes an input that is out of bounds for us
raise ValueError(
f"Packet describes input {packet.header.data_id.signal_input_index}, "
f"but we are limited to describing {self.nr_signal_inputs} starting "
f"at index {self.first_signal_input_index}"
)
if packet.header.payload_error:
# cannot trust the data if a payload error is reported
self.parameters["nof_payload_errors"][fpga_nr] += numpy.uint64(1)
# don't raise, as packet is valid
return
# process the packet
self.parameters["nof_valid_payloads"][fpga_nr] += numpy.uint64(1)
self.parameters["sst_values"][input_index][
: packet.header.nof_statistics_per_packet
] = packet.payload()
self.parameters["sst_timestamps"][input_index] = numpy.float64(
packet.timestamp.timestamp()
)
self.parameters["integration_intervals"][
input_index
] = packet.header.integration_interval
self.parameters["subbands_calibrated"][
input_index
] = packet.header.subband_calibrated_flag
class XSTCollector(StatisticsCollector):
"""Class to process XST statistics packets.
......@@ -274,15 +96,6 @@ class XSTCollector(StatisticsCollector):
defaults.update(
{
# Number of packets received so far that we could parse correctly
# and do not have a payload error
"nof_valid_payloads": numpy.zeros(
(self.MAX_FPGAS,), dtype=numpy.uint64
),
# Packets that reported a payload error
"nof_payload_errors": numpy.zeros(
(self.MAX_FPGAS,), dtype=numpy.uint64
),
# Last value array we've constructed out of the packets
"xst_blocks": numpy.zeros(
(
......@@ -492,80 +305,3 @@ class XSTCollector(StatisticsCollector):
] = block
return matrix
class BSTCollector(StatisticsCollector):
"""Class to process SST statistics packets."""
# beamlets = 488 * 2 for the x and y polarisations
MAX_BEAMLETS = 488
def _default_parameters(self):
defaults = super()._default_parameters()
defaults.update(
{
# Number of packets received so far that we could parse correctly and
# do not have a payload error
"nof_valid_payloads": numpy.zeros(
(self.MAX_FPGAS,),
dtype=numpy.uint64,
),
# Packets that reported a payload error
"nof_payload_errors": numpy.zeros(
(self.MAX_FPGAS,),
dtype=numpy.uint64,
),
# Last value array we've constructed out of the packets
"bst_values": numpy.zeros(
(self.MAX_BEAMLETS, N_pol),
dtype=numpy.uint64,
),
"bst_timestamps": numpy.zeros(
(self.MAX_FPGAS,),
dtype=numpy.float64,
),
"integration_intervals": numpy.zeros(
(self.MAX_FPGAS,),
dtype=numpy.float32,
),
}
)
return defaults
def _parse_packet(self, packet: BSTPacket):
fpga_nr = self.gn_index_to_fpga_nr(packet.header.gn_index)
# number of beamlets in the packet
beamlets = packet.payload()
nr_beamlets = beamlets.shape[0]
first_beamlet = packet.header.data_id.beamlet_index
last_beamlet = first_beamlet + nr_beamlets
# determine which input this packet contains data for
if last_beamlet > self.MAX_BEAMLETS:
# packet describes an input that is out of bounds for us
raise ValueError(
f"Packet describes {nr_beamlets} beamlets starting at "
f"{packet.header.data_id.beamlet_index}, but we are limited "
f"to describing MAX_BEAMLETS={self.MAX_BEAMLETS}"
)
if packet.header.payload_error:
# cannot trust the data if a payload error is reported
self.parameters["nof_payload_errors"][fpga_nr] += numpy.uint64(1)
# don't raise, as packet is valid
return
# process the packet
self.parameters["nof_valid_payloads"][fpga_nr] += numpy.uint64(1)
self.parameters["bst_values"][first_beamlet:last_beamlet] = beamlets
self.parameters["bst_timestamps"][fpga_nr] = numpy.float64(
packet.timestamp.timestamp()
)
self.parameters["integration_intervals"][
fpga_nr
] = packet.header.integration_interval
# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
"""Different types of receivers for statistics"""
from ._create import create
from ._file import FileReceiver
from ._receiver import Receiver
from ._tcp import TCPReceiver
from ._zmq import ZeroMQReceiver
__all__ = ["create", "FileReceiver", "TCPReceiver", "Receiver", "ZeroMQReceiver"]
# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
"""Create receiver based on uri"""
from urllib.parse import urlsplit
from ._file import FileReceiver
from ._tcp import TCPReceiver
from ._zmq import ZeroMQReceiver
def create(uri):
"""Create a Receiver based on the given URI"""
parsed = urlsplit(uri)
if parsed.scheme == "tcp":
return TCPReceiver(parsed.hostname, parsed.port)
if parsed.scheme == "file":
return FileReceiver(parsed.path)
if parsed.scheme.startswith("zmq"):
return ZeroMQReceiver(uri[4:])
raise ValueError(f"Provided uri '{uri}' is not supported")
# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
"""FileReceiver"""
import os
from lofar_station_client.statistics.receivers._receiver import Receiver
class FileReceiver(Receiver):
"""File receiver"""
def __init__(self, filename):
self.filename = filename
self.fileno = None
super().__init__()
def __iter__(self):
self.fileno = os.open(self.filename, os.O_RDONLY)
return super().__iter__()
def __del__(self):
os.close(self.fileno)
@property
def fdesc(self):
return self.fileno
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment