diff --git a/.gitignore b/.gitignore index 4c7f7e8cb91318300efc16acf0376575bdad03ec..59f1a3a1758d150c8a740022194e7cf75106032f 100644 --- a/.gitignore +++ b/.gitignore @@ -29,3 +29,6 @@ lofar_station_client/_version.py # Caches __pycache__ + +# Git +*.orig diff --git a/README.md b/README.md index 267a5fd2e82a51fc45b2783bf55657ad18b7bf16..9287d521e529ccbfeb2b479f3ab39f34806f2948 100644 --- a/README.md +++ b/README.md @@ -131,6 +131,7 @@ tox -e debug tests.requests.test_prometheus ## Release notes +- 0.18.3 - Refactoring statistics packets. Moving tango to optional dependency `[tango]` - 0.18.2 - Bugfix when closing unused HDF5 files - 0.18.1 - Flush HDF5 files explicitly, reduce memory usage for XSTs - 0.18.0 - MultiStationObservation and StationFutures allow multi field observations diff --git a/VERSION b/VERSION index 503a21deb47d43c1715cd400032e4a9f82857c6f..267d7e011feae65732d5714726d601dbf9032289 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.18.2 +0.18.3 diff --git a/lofar_station_client/__init__.py b/lofar_station_client/__init__.py index b46630c3ccd7fecfb0c0753a743f08b165e52883..5b9128aab658c74f2230f692222fe0311ddf3d50 100644 --- a/lofar_station_client/__init__.py +++ b/lofar_station_client/__init__.py @@ -1,5 +1,8 @@ # -*- coding: utf-8 -*- +# Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy) +# SPDX-License-Identifier: Apache-2.0 + # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at diff --git a/lofar_station_client/_utils.py b/lofar_station_client/_utils.py new file mode 100644 index 0000000000000000000000000000000000000000..6ef16c1fdef4a7cf66fe885253ade15fbbc4f93b --- /dev/null +++ b/lofar_station_client/_utils.py @@ -0,0 +1,14 @@ +# Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy) +# SPDX-License-Identifier: Apache-2.0 + +""" Provide package wide utils. """ + +import sys + + +def sys_exit_no_tango(): + """Exit with message that tango wasn't found""" + sys.exit( + "Error importing pytango, did you install the " + "optional dependency 'station-client[tango]'" + ) diff --git a/lofar_station_client/devices.py b/lofar_station_client/devices.py index dac55bfd6fd3f3bebdaa0977b7511f60604ac4a5..9473980eb06df0cac6a5d00850fceb0168d25fc8 100644 --- a/lofar_station_client/devices.py +++ b/lofar_station_client/devices.py @@ -11,8 +11,14 @@ import logging from functools import lru_cache import numpy -from tango import DevFailed, DeviceProxy -from tango import ExtractAs + +from lofar_station_client._utils import sys_exit_no_tango + +try: + from tango import DevFailed, DeviceProxy + from tango import ExtractAs +except ImportError: + sys_exit_no_tango() logger = logging.getLogger() diff --git a/lofar_station_client/observation/station_observation_future.py b/lofar_station_client/observation/station_observation_future.py index c3364b4c2faf71a6dee5299751a6d055fb267476..2dc7f0052800aea5bd9ca1162b1334d6ad423ff7 100644 --- a/lofar_station_client/observation/station_observation_future.py +++ b/lofar_station_client/observation/station_observation_future.py @@ -6,17 +6,21 @@ Internal class for station states to observations """ import concurrent.futures -from json import dumps import logging +from json import dumps from typing import List -from tango import DeviceProxy, GreenMode, DevFailed - +from lofar_station_client._utils import sys_exit_no_tango from lofar_station_client.observation.constants import ( OBSERVATION_CONTROL_DEVICE_NAME, OBSERVATION_FIELD_DEVICE_NAME, ) +try: + from tango import DeviceProxy, GreenMode, DevFailed +except ImportError: + sys_exit_no_tango() + logger = logging.getLogger() diff --git a/lofar_station_client/statistics/collector.py b/lofar_station_client/statistics/collector.py index b66799a884392cc482542d3abfd5d4bf1731c93c..f03d18e3122e332edc9742ce06966520408f75c7 100644 --- a/lofar_station_client/statistics/collector.py +++ b/lofar_station_client/statistics/collector.py @@ -17,16 +17,15 @@ import logging import numpy +from lofar_station_client.dts.constants import N_pol from lofar_station_client.math.baseline import baseline_from_index from lofar_station_client.math.baseline import baseline_index # TODO(Corne): Discuss moving to lofar_common_python library? from lofar_station_client.math.baseline import nr_baselines -from lofar_station_client.statistics.packet import BSTPacket -from lofar_station_client.statistics.packet import SSTPacket -from lofar_station_client.statistics.packet import XSTPacket - -from lofar_station_client.dts.constants import N_pol +from lofar_station_client.statistics.packets import BSTPacket, StatisticsPacket +from lofar_station_client.statistics.packets import SSTPacket +from lofar_station_client.statistics.packets import XSTPacket logger = logging.getLogger() @@ -66,12 +65,12 @@ class StatisticsCollector(abc.ABC): Throws a ValueError if more than MAX_FPGAS are encountered.""" - def find_fpga_nr(gn_index): - indices = numpy.where(self.parameters["gn_indices"] == gn_index)[0] + def find_fpga_nr(gni): + indices = numpy.where(self.parameters["gn_indices"] == gni)[0] if len(indices): return indices[0] - raise ValueError(f"Could not find gn_index {gn_index}") + raise ValueError(f"Could not find gn_index {gni}") try: return find_fpga_nr(gn_index) @@ -91,7 +90,7 @@ class StatisticsCollector(abc.ABC): self.parameters["gn_indices"][new_fpga_nr] = gn_index return new_fpga_nr - def process_packet(self, packet): + def process_packet(self, packet: StatisticsPacket): """Baseclass wrapper around performing parse_packet""" self.parameters["nof_packets"] += numpy.uint64(1) @@ -100,7 +99,7 @@ class StatisticsCollector(abc.ABC): self._parse_packet(packet) except Exception as err: self.parameters["last_invalid_packet"] = numpy.frombuffer( - packet, dtype=numpy.uint8 + packet.raw, dtype=numpy.uint8 ) self.parameters["nof_invalid_packets"] += numpy.uint64(1) @@ -109,7 +108,7 @@ class StatisticsCollector(abc.ABC): raise ValueError("Could not parse statistics packet") from err @abc.abstractmethod - def _parse_packet(self, packet): + def _parse_packet(self, packet: StatisticsPacket): """Update any information based on this packet.""" raise NotImplementedError @@ -164,23 +163,26 @@ class SSTCollector(StatisticsCollector): return defaults - def _parse_packet(self, packet): - fields = SSTPacket(packet) + def _parse_packet(self, packet: SSTPacket): + if not isinstance(packet, SSTPacket): + raise ValueError("Packet is not of type SSTPacket") - fpga_nr = self.gn_index_to_fpga_nr(fields.gn_index) + fpga_nr = self.gn_index_to_fpga_nr(packet.header.gn_index) - input_index = fields.signal_input_index - self.first_signal_input_index + input_index = ( + packet.header.data_id.signal_input_index - self.first_signal_input_index + ) # determine which input this packet contains data for if not 0 <= input_index < self.nr_signal_inputs: # packet describes an input that is out of bounds for us raise ValueError( - f"Packet describes input {fields.signal_input_index}, but we are " - f"limited to describing {self.nr_signal_inputs} starting at index " - f"{self.first_signal_input_index}" + f"Packet describes input {packet.header.data_id.signal_input_index}, " + f"but we are limited to describing {self.nr_signal_inputs} starting " + f"at index {self.first_signal_input_index}" ) - if fields.payload_error: + if packet.header.payload_error: # cannot trust the data if a payload error is reported self.parameters["nof_payload_errors"][fpga_nr] += numpy.uint64(1) @@ -190,17 +192,17 @@ class SSTCollector(StatisticsCollector): # process the packet self.parameters["nof_valid_payloads"][fpga_nr] += numpy.uint64(1) self.parameters["sst_values"][input_index][ - : fields.nof_statistics_per_packet - ] = fields.payload() + : packet.header.nof_statistics_per_packet + ] = packet.payload() self.parameters["sst_timestamps"][input_index] = numpy.float64( - fields.timestamp().timestamp() + packet.timestamp.timestamp() ) self.parameters["integration_intervals"][ input_index - ] = fields.integration_interval() + ] = packet.header.integration_interval self.parameters["subbands_calibrated"][ input_index - ] = fields.subband_calibrated_flag + ] = packet.header.subband_calibrated_flag class XSTCollector(StatisticsCollector): @@ -333,12 +335,13 @@ class XSTCollector(StatisticsCollector): # prefer the first one in case of multiple minima return numpy.where(self.parameters["xst_timestamps"] == oldest_timestamp)[0][0] - def _parse_packet(self, packet): - fields = XSTPacket(packet) + def _parse_packet(self, packet: XSTPacket): + if not isinstance(packet, XSTPacket): + raise ValueError("Packet is not of type XSTPacket") - fpga_nr = self.gn_index_to_fpga_nr(fields.gn_index) + fpga_nr = self.gn_index_to_fpga_nr(packet.header.gn_index) - if fields.payload_error: + if packet.header.payload_error: # cannot trust the data if a payload error is reported self.parameters["nof_payload_errors"][fpga_nr] += numpy.uint64(1) @@ -346,43 +349,44 @@ class XSTCollector(StatisticsCollector): return # the blocks must be of size BLOCK_LENGTH x BLOCK_LENGTH - if fields.nof_signal_inputs != self.BLOCK_LENGTH: + if packet.header.nof_signal_inputs != self.BLOCK_LENGTH: raise ValueError( - f"Packet describes a block of {fields.nof_signal_inputs} x " - f"{fields.nof_signal_inputs} baselines, but we can only parse blocks" - f"of {self.BLOCK_LENGTH} x {self.BLOCK_LENGTH} baselines" + f"Packet describes a block of {packet.header.nof_signal_inputs} x " + f"{packet.header.nof_signal_inputs} baselines, but we can only parse " + f"blocks of {self.BLOCK_LENGTH} x {self.BLOCK_LENGTH} baselines" ) # check whether set of baselines in this packet are not out of bounds for antenna in (0, 1): if not ( 0 - <= fields.first_baseline[antenna] - + fields.nof_signal_inputs + <= packet.header.data_id.first_baseline[antenna] + + packet.header.nof_signal_inputs - self.first_signal_input_index <= self.nr_signal_inputs ): # packet describes an input that is out of bounds for us raise ValueError( - f"Packet describes {fields.nof_signal_inputs} x " - f"{fields.nof_signal_inputs} baselines starting at " - f"{fields.first_baseline}, but we are limited to describing " - f"{self.nr_signal_inputs} starting at offset " + f"Packet describes {packet.header.nof_signal_inputs} x" + f"{packet.header.nof_signal_inputs} baselines starting at" + f"{packet.header.data_id.first_baseline}, but we are limited to " + f"describing {self.nr_signal_inputs} starting at offset " f"{self.first_signal_input_index}" ) # the blocks of baselines need to be tightly packed, and thus be provided # at exact intervals - if fields.first_baseline[antenna] % self.BLOCK_LENGTH != 0: + if packet.header.data_id.first_baseline[antenna] % self.BLOCK_LENGTH != 0: raise ValueError( - f"Packet describes baselines starting at {fields.first_baseline}, " + f"Packet describes baselines starting at " + f"{packet.header.data_id.first_baseline}, " f"but we require a multiple of BLOCK_LENGTH={self.BLOCK_LENGTH}" ) # Make sure we always have a baseline (a,b) with a>=b. If not, we swap the # indices and mark that the data must be conjugated and transposed when # processed. - first_baseline = fields.first_baseline + first_baseline = packet.header.data_id.first_baseline if first_baseline[0] < first_baseline[1]: conjugated = True first_baseline = (first_baseline[1], first_baseline[0]) @@ -396,13 +400,13 @@ class XSTCollector(StatisticsCollector): ) # we keep track of multiple subbands. select slot for this one - subband_slot = self.select_subband_slot(fields.subband_index) + subband_slot = self.select_subband_slot(packet.header.data_id.subband_index) assert 0 <= subband_slot < self.nr_parallel_subbands, ( f"Selected slot{subband_slot}, but only have room for " f"{self.nr_parallel_subbands}. Existing slots are " f"{self.parameters['xst_subbands']}, processing subband " - f"{fields.subband_index}." + f"{packet.header.data_id.subband_index}." ) # the payload contains complex values for the block of baselines of size @@ -427,18 +431,18 @@ class XSTCollector(StatisticsCollector): self.parameters["nof_valid_payloads"][fpga_nr] += numpy.uint64(1) self.parameters["xst_blocks"][ - subband_slot, block_index, : fields.nof_statistics_per_packet - ] = fields.payload() + subband_slot, block_index, : packet.header.nof_statistics_per_packet + ] = packet.payload() self.parameters["xst_timestamps"][subband_slot] = numpy.float64( - fields.timestamp().timestamp() + packet.timestamp.timestamp() ) self.parameters["xst_conjugated"][subband_slot, block_index] = conjugated self.parameters["xst_subbands"][subband_slot] = numpy.uint16( - fields.subband_index + packet.header.data_id.subband_index ) self.parameters["xst_integration_intervals"][ subband_slot - ] = fields.integration_interval() + ] = packet.header.integration_interval def xst_values(self, subband_indices=None): """xst_blocks, but as a matrix[len(subband_indices)][MAX_INPUTS][MAX_INPUTS] @@ -530,15 +534,13 @@ class BSTCollector(StatisticsCollector): return defaults - def _parse_packet(self, packet): - fields = BSTPacket(packet) - - fpga_nr = self.gn_index_to_fpga_nr(fields.gn_index) + def _parse_packet(self, packet: BSTPacket): + fpga_nr = self.gn_index_to_fpga_nr(packet.header.gn_index) # number of beamlets in the packet - beamlets = fields.payload() + beamlets = packet.payload() nr_beamlets = beamlets.shape[0] - first_beamlet = fields.beamlet_index + first_beamlet = packet.header.data_id.beamlet_index last_beamlet = first_beamlet + nr_beamlets # determine which input this packet contains data for @@ -546,11 +548,11 @@ class BSTCollector(StatisticsCollector): # packet describes an input that is out of bounds for us raise ValueError( f"Packet describes {nr_beamlets} beamlets starting at " - f"{fields.beamlet_index}, but we are limited " + f"{packet.header.data_id.beamlet_index}, but we are limited " f"to describing MAX_BEAMLETS={self.MAX_BEAMLETS}" ) - if fields.payload_error: + if packet.header.payload_error: # cannot trust the data if a payload error is reported self.parameters["nof_payload_errors"][fpga_nr] += numpy.uint64(1) @@ -562,8 +564,8 @@ class BSTCollector(StatisticsCollector): self.parameters["bst_values"][first_beamlet:last_beamlet] = beamlets self.parameters["bst_timestamps"][fpga_nr] = numpy.float64( - fields.timestamp().timestamp() + packet.timestamp.timestamp() ) self.parameters["integration_intervals"][ fpga_nr - ] = fields.integration_interval() + ] = packet.header.integration_interval diff --git a/lofar_station_client/statistics/packet.py b/lofar_station_client/statistics/packet.py deleted file mode 100644 index 1d984aff428bc47f2126e24514a8d60395ac7a88..0000000000000000000000000000000000000000 --- a/lofar_station_client/statistics/packet.py +++ /dev/null @@ -1,798 +0,0 @@ -# Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy) -# SPDX-License-Identifier: Apache-2.0 - -"""All types of packets as parsed by different collectors""" - -# TODO(Corne): Remove disable E1121 once reshape parameters are verified - -# too-many-instance-attributes, too-many-function-args -# pylint: disable=R0902,E1121 - -import struct -from datetime import datetime -from datetime import timezone - -import numpy - -N_POL = 2 -N_COMPLEX = 2 - - -def get_bit_value(value: bytes, first_bit: int, last_bit: int = None) -> int: - """Return bits [first_bit:last_bit] from value as integer. Bit 0 = LSB. - - For example, extracting bits 2-3 from b'01100' returns 11 binary = 3 decimal: - get_bit_value(b'01100', 2, 3) == 3 - - If 'last_bit' is not given, just the value of bit 'first_bit' is returned. - """ - - # default last_bit to first_bit - if last_bit is None: - last_bit = first_bit - - return value >> first_bit & ((1 << (last_bit - first_bit + 1)) - 1) - - -class SDPPacket: - """Models a UDP packet from SDP. - - Packets are expected to be UDP payload only (so no Ethernet/IP/UDP headers). - - The following fields are exposed as properties & functions. The _raw fields come - directly from the packet, and have more user-friendly alternatives for - interpretation: - - marker_raw - - packet marker as byte. - - marker() - - packet marker as character. 'S' = SST, 'X' = XST, 'B' = BST - - version_id - - packet format version. - - observation_id - - observation identifier. - - station_info - - bit field with station information, encoding several other properties. - - station_id - - station identifier. - - antenna_field_index - - antenna field id. 0 = LBA/HBA/HBA0, 1 = HBA1 - - source_info - - bit field with input information, encoding several other properties. - - antenna_band_index - - antenna type. 0 = low band, 1 = high band. - - nyquist_zone_index - - nyquist zone of filter - - - 0 = 0 -- 1/2 * t_adc Hz (low band), - - 1 = 1/2 * t_adc -- t_adc Hz (high band), - - 2 = t_adc -- 3/2 * t_adc Hz (high band). - - t_adc - - sampling clock. 0 = 160 MHz, 1 = 200 MHz. - - fsub_type - - sampling method. 0 = critically sampled, 1 = oversampled. - - payload_error - - 0 = data is ok, 1 = data is corrupted (a fault was encountered). - - beam_repositioning_flag - - 0 = data is ok, 1 = beam got repositioned during packet construction - (BST only). - - gn_index - - global index of FPGA that emitted this packet. - - block_period_raw - - block period, in nanoseconds. - - block_period() - - block period, in seconds. - - block_serial_number - - timestamp of the data, in block periods since 1970. - - timestamp() - - timestamp of the data, as a datetime object. - - """ - - def __init__(self, packet: bytes): - self.antenna_band_index = None - self.nyquist_zone_index = None - self.t_adc = None - self.fsub_type = None - self.payload_error = None - self.beam_repositioning_flag = None - self.station_id = None - self.antenna_field_index = None - # self.source_info 5-8 are reserved - self.gn_index = None - self.nof_statistics_per_packet = None - self.nof_bytes_per_statistic = None - - self.packet = packet - - self.unpack() - - # Only parse valid statistics packets from SDP, reject everything else - if self.marker_raw not in self.valid_markers(): - raise ValueError( - f"Invalid packet of type {self.__class__.__name__}: packet marker" - f"(first byte) is {self.marker}, not one of {self.valid_markers()}." - ) - - def __str__(self): - return ( - f"SDPPacket(marker={self.marker}, " - f"station={self.station_id}, field={self.antenna_field_name()}, " - f"gn_index={self.gn_index}, " - f"timestamp={self.timestamp().strftime('%FT%T')}, " - f"payload_error={self.payload_error})" - ) - - # format string for the header, see unpack below - HEADER_FORMAT = ">cBL HH xxxxx xxxxxxx HQ" - HEADER_SIZE = struct.calcsize(HEADER_FORMAT) - - @classmethod - def valid_markers(cls): - """Valid values for the 'marker_raw' header field for this class. - - Each new packet class that introduces a new marker should be added - to the PACKET_CLASS_FOR_MARKER registry, which holds the mapping - marker -> packet class. - """ - - # return all markers registered in PACKET_CLASS_FOR_MARKER which are for this - # class or any of its specialisations - return [ - marker - for marker, klass in PACKET_CLASS_FOR_MARKER.items() - if issubclass(klass, cls) - ] - - def downcast(self): - """Return a more specialised object for this packet.""" - - klass = PACKET_CLASS_FOR_MARKER.get(self.marker_raw, SDPPacket) - return klass(self.packet) - - def unpack(self): - """Unpack the packet into properties of this object.""" - - # unpack fields - try: - ( - self.marker_raw, - self.version_id, - self.observation_id, - self.station_info, - self.source_info, - self.block_period_raw, - self.block_serial_number, - ) = struct.unpack(self.HEADER_FORMAT, self.packet[: self.HEADER_SIZE]) - except struct.error as ex: - raise ValueError("Error parsing statistics packet") from ex - - # unpack the fields we just updated - self.unpack_station_info() - self.unpack_source_info() - - def unpack_station_info(self): - """Unpack the station_info field into properties of this object.""" - - self.station_id = get_bit_value(self.station_info, 0, 9) - self.antenna_field_index = get_bit_value(self.station_info, 10, 15) - - def unpack_source_info(self): - """Unpack the source_info field into properties of this object.""" - - self.antenna_band_index = get_bit_value(self.source_info, 15) - self.nyquist_zone_index = get_bit_value(self.source_info, 13, 14) - self.t_adc = get_bit_value(self.source_info, 12) - self.fsub_type = get_bit_value(self.source_info, 11) - self.payload_error = get_bit_value(self.source_info, 10) != 0 - self.beam_repositioning_flag = get_bit_value(self.source_info, 9) != 0 - # self.source_info 5-8 are reserved - self.gn_index = get_bit_value(self.source_info, 0, 4) - - def expected_size(self) -> int: - """Determine packet size (header + payload) from the header""" - - # the generic header does not contain enough information to determine the - # payload size - raise NotImplementedError - - def size(self) -> int: - """The actual size of this packet.""" - - return len(self.packet) - - @property - def marker(self) -> str: - """Return the type of statistic: - - - 'S' = SST - - 'B' = BST - - 'X' = XST - - 'b' = beamlet - """ - - try: - return self.marker_raw.decode("ascii") - except UnicodeDecodeError: - # non-ascii (>127) character, return as binary - # - # this is typically not visible to the user, as these packets are not SDP - # statistics packets, which the constructor will refuse to accept. - return self.marker_raw - - def antenna_field_name(self) -> str: - """Returns the name of the antenna field as one of LBA-#0, HBA-#0, HBA-#1.""" - - antenna_band = ["LBA", "HBA"][self.antenna_band_index] - - # NB: This returns HBA-#0 for both HBA0 and HBA - return f"{antenna_band}-#{self.antenna_field_index}" - - def block_period(self) -> float: - """Return the block period, in seconds.""" - - return self.block_period_raw / 1e9 - - def timestamp(self) -> datetime: - """Returns the timestamp of the data in this packet. - - :return: datetime from block_serial_number and block period but - datetime.min if the block_serial_number in the packet is not set - datetime.max if the timestamp cannot be represented in python - """ - - try: - return datetime.fromtimestamp( - self.block_serial_number * self.block_period(), timezone.utc - ) - except ValueError: - # Let's not barf anytime we want to print a header - return datetime.max - - def header(self) -> dict: - """Return all the header fields as a dict.""" - - header = { - "marker": self.marker, - "version_id": self.version_id, - "observation_id": self.observation_id, - "station_info": { - "_raw": self.station_info, - "station_id": self.station_id, - "antenna_field_index": self.antenna_field_index, - }, - "source_info": { - "_raw": self.source_info, - "antenna_band_index": self.antenna_band_index, - "nyquist_zone_index": self.nyquist_zone_index, - "t_adc": self.t_adc, - "fsub_type": self.fsub_type, - "payload_error": self.payload_error, - "beam_repositioning_flag": self.beam_repositioning_flag, - "gn_index": self.gn_index, - }, - "block_period_raw": self.block_period_raw, - "block_period": self.block_period(), - "block_serial_number": self.block_serial_number, - "timestamp": self.timestamp(), - } - - if self.t_adc == 0: - header["f_adc"] = 160 - elif self.t_adc == 1: - header["f_adc"] = 200 - - return header - - def payload(self, signed=False) -> numpy.array: - """The payload of this packet, as a linear array.""" - - # derive which and how many elements to read from the packet header - bytecount_to_unsigned_struct_type = ( - {1: "b", 2: "h", 4: "i", 8: "q"} - if signed - else {1: "B", 2: "H", 4: "I", 8: "Q"} - ) - format_str = ( - f">{self.nof_statistics_per_packet}" - f"{bytecount_to_unsigned_struct_type[self.nof_bytes_per_statistic]}" - ) - - return numpy.array( - struct.unpack( - format_str, - self.packet[ - self.HEADER_SIZE : self.HEADER_SIZE + struct.calcsize(format_str) - ], - ) - ) - - -class BeamletPacket(SDPPacket): - """Models a beamlet UDP packet from SDP. - - Packets are expected to be UDP payload only (so no Ethernet/IP/UDP headers). - - The following additional fields are exposed as properties & functions. - - beamlet_width: - - bits/beamlet (4, 8, 16). - - beamlet_scale: - - scaling of beamlets prior to quantisation. - - beamlet_index: - - index of the first beamlet in the payload. - - nof_blocks_per_packet: - - number of blocks (timestamps) of data in the payload. - - nof_beamlets_per_block: - - number of beamlets in the payload. - """ - - # format string for the header, see unpack below - HEADER_FORMAT = ">cBL HBH xxxx HHBHHQ" - HEADER_SIZE = struct.calcsize(HEADER_FORMAT) - - def __init__(self, packet: bytes): - self.beamlet_width = None - - super().__init__(packet) - - def __str__(self): - return ( - f"BeamletPacket(" - f"station={self.station_id}, field={self.antenna_field_name()}, " - f"gn_index={self.gn_index}, " - f"timestamp={self.timestamp().strftime('%FT%T')}, " - f"payload_error={self.payload_error})" - ) - - def unpack(self): - """Unpack the packet into properties of this object.""" - - # unpack fields - try: - ( - self.marker_raw, - self.version_id, - self.observation_id, - self.station_info, - self.source_info_h, - self.source_info_l, - self.beamlet_scale, - self.beamlet_index, - self.nof_blocks_per_packet, - self.nof_beamlets_per_block, - self.block_period_raw, - self.block_serial_number, - ) = struct.unpack(self.HEADER_FORMAT, self.packet[: self.HEADER_SIZE]) - except struct.error as ex: - raise ValueError("Error parsing beamlet packet") from ex - - self.source_info = (self.source_info_h << 16) + self.source_info_l - - # unpack the fields we just updated - self.unpack_station_info() - self.unpack_source_info() - - # set computed fields in base class - self.nof_statistics_per_packet = self._nof_statistics_per_packet() - self.nof_bytes_per_statistic = self._nof_bytes_per_statistic() - - def unpack_source_info(self): - """Unpack the source_info field into properties of this object.""" - - super().unpack_source_info() - - self.beamlet_width = get_bit_value(self.source_info, 8, 11) or 16 - - def expected_size(self) -> int: - """The size this packet should be (header + payload), extracted from header.""" - - return ( - self.HEADER_SIZE - + self.nof_statistics_per_packet * self.nof_bytes_per_statistic - ) - - def _nof_statistics_per_packet(self) -> int: - """Compute number of statistics per packet.""" - - return ( - self.nof_blocks_per_packet - * self.nof_beamlets_per_block - * N_POL - * N_COMPLEX - * self.beamlet_width - // 8 - ) - - def _nof_bytes_per_statistic(self) -> int: - """Determine number of bytes per statistic.""" - - if self.beamlet_width == 8: - # cint8 data [-127, 127] - return 1 - if self.beamlet_width == 16: - # cint16 data [-32767, 32767] - return 2 - if self.beamlet_width == 4: - # cint4 data [-7, 7], packet in 1 byte - return 1 - - return -1 - - def header(self) -> dict: - """Return all the header fields as a dict.""" - - header = super().header() - - header["source_info"]["beamlet_width"] = self.beamlet_width - - header.update( - { - "beamlet_scale": self.beamlet_scale, - "beamlet_index": self.beamlet_index, - "nof_blocks_per_packet": self.nof_blocks_per_packet, - "nof_beamlets_per_block": self.nof_beamlets_per_block, - } - ) - - return header - - def payload_raw(self): - """Generate payload as linear array""" - if self.beamlet_width == 4: - return ( - super() - .payload(signed=True) - .reshape(self.nof_blocks_per_packet, self.nof_beamlets_per_block, N_POL) - ) - - return ( - super() - .payload(signed=True) - .reshape( - self.nof_blocks_per_packet, - self.nof_beamlets_per_block, - N_POL, - N_COMPLEX, - ) - ) - - def payload(self, signed=True): - """A property that should probably be removed""" - - if self.beamlet_width == 4: - # real in low 4 bits, imag in high 4 bits (both signed!) - payload_raw = self.payload_raw() - - # shift extends sign, so we prefer it over bitwise and - return (payload_raw << 4 >> 4) + (payload_raw >> 4) * 1j - - return ( - # view() reduces the last dimension to size 1, so drop it - self.payload_raw() - .astype(numpy.float32) - .view(numpy.complex64)[:, :, :, 0] - ) - - -class StatisticsPacket(SDPPacket): - """Models a statistics UDP packet from SDP. - - Packets are expected to be UDP payload only (so no Ethernet/IP/UDP headers). - - The following additional fields are exposed as properties & functions. - - subband_calibrated_flag 1 = subband data had subband calibration values applied, - 0 = not. - - data_id bit field with payload information, encoding several - other properties. - - nof_signal_inputs number of inputs that contributed to data in this packet. - nof_bytes_per_statistics word size of each statistic. - nof_statistics_per_packet number of statistic data points in the payload. - - integration_interval_raw integration interval, in block periods. - integration_interval() integration interval, in seconds. - """ - - # statistics format string for the header, see unpack below - HEADER_FORMAT = ">cBL HHB BHL BBH HQ" - HEADER_SIZE = struct.calcsize(HEADER_FORMAT) - - def __init__(self, packet: bytes): - self.subband_calibrated_flag = None - - super().__init__(packet) - - def __str__(self): - return ( - f"StatisticsPacket(marker={self.marker}, " - f"station={self.station_id}, field={self.antenna_field_name()}, " - f"gn_index={self.gn_index}, " - f"timestamp={self.timestamp().strftime('%FT%T')}, " - f"payload_error={self.payload_error})" - ) - - def unpack(self): - """Unpack the packet into properties of this object.""" - - # unpack fields - try: - ( - self.marker_raw, - self.version_id, - self.observation_id, - self.station_info, - self.source_info, - # reserved byte - _, - # integration interval, in block periods. This field is 3 bytes, - # big endian -- combine later - integration_interval_hi, - integration_interval_lo, - self.data_id, - self.nof_signal_inputs, - self.nof_bytes_per_statistic, - self.nof_statistics_per_packet, - self.block_period_raw, - self.block_serial_number, - ) = struct.unpack(self.HEADER_FORMAT, self.packet[: self.HEADER_SIZE]) - - self.integration_interval_raw = ( - integration_interval_hi << 16 - ) + integration_interval_lo - except struct.error as ex: - raise ValueError("Error parsing statistics packet") from ex - - # unpack the fields we just updated - self.unpack_station_info() - self.unpack_source_info() - self.unpack_data_id() - - def expected_size(self) -> int: - """The size this packet should be (header + payload), extracted from header.""" - - return ( - self.HEADER_SIZE - + self.nof_statistics_per_packet * self.nof_bytes_per_statistic - ) - - def unpack_source_info(self): - """Unpack the source_info field into properties of this object.""" - - super().unpack_source_info() - self.subband_calibrated_flag = get_bit_value(self.source_info, 8) != 0 - - def unpack_data_id(self): - """Unpack the data_id field into properties of this object.""" - # only useful in specialisations (XST/SST/BST) - - def integration_interval(self) -> float: - """Returns the integration interval, in seconds.""" - - # Translate to seconds using the block period - return self.integration_interval_raw * self.block_period() - - def header(self) -> dict: - """Return all the header fields as a dict.""" - - header = super().header() - - header["source_info"]["subband_calibrated_flag"] = self.subband_calibrated_flag - - header.update( - { - "data_id": { - "_raw": self.data_id, - }, - "integration_interval_raw": self.integration_interval_raw, - "integration_interval": self.integration_interval(), - "nof_signal_inputs": self.nof_signal_inputs, - "nof_bytes_per_statistic": self.nof_bytes_per_statistic, - "nof_statistics_per_packet": self.nof_statistics_per_packet, - } - ) - - return header - - -class SSTPacket(StatisticsPacket): - """Models an SST statistics UDP packet from SDP. - - The following fields are exposed as properties & functions. - - signal_input_index: - input (antenna polarisation) index for which this packet contains statistics - - payload[nof_statistics_per_packet]: - SST statistics, an array of amplitudes per subband. - """ - - def __init__(self, packet: bytes): - self.signal_input_index = None - - super().__init__(packet) - - def __str__(self): - return ( - f"SSTPacket(" - f"station={self.station_id}, field={self.antenna_field_name()}, " - f"gn_index={self.gn_index}, " - f"timestamp={self.timestamp().strftime('%FT%T')}, " - f"payload_error={self.payload_error}, " - f"signal_input={self.signal_input_index})" - ) - - def unpack_data_id(self): - super().unpack_data_id() - - self.signal_input_index = get_bit_value(self.data_id, 0, 7) - - def header(self): - header = super().header() - - header["data_id"]["signal_input_index"] = self.signal_input_index - - return header - - def payload(self, signed=True): - return super().payload(signed=False) - - -class XSTPacket(StatisticsPacket): - """Models an XST statistics UDP packet from SDP. - - The following fields are exposed as properties & functions. - - subband_index: - subband number for which this packet contains statistics. - first_baseline: - first antenna pair for which this packet contains statistics. - - payload[nof_signal_inputs][nof_signal_inputs] the baselines, starting from - first_baseline - """ - - def __init__(self, packet: bytes): - self.subband_index = None - self.first_baseline = None - - super().__init__(packet) - - def __str__(self): - last_baseline = ( - self.first_baseline[0] + self.nof_signal_inputs - 1, - self.first_baseline[1] + self.nof_signal_inputs - 1, - ) - return ( - f"XSTPacket(" - f"station={self.station_id}, field={self.antenna_field_name()}, " - f"gn_index={self.gn_index}, " - f"timestamp={self.timestamp().strftime('%FT%T')}, " - f"payload_error={self.payload_error}, " - f"subband={self.subband_index}, " - f"baselines={self.first_baseline} - {last_baseline})" - ) - - def unpack_data_id(self): - super().unpack_data_id() - - self.subband_index = get_bit_value(self.data_id, 16, 24) - self.first_baseline = ( - get_bit_value(self.data_id, 8, 15), - get_bit_value(self.data_id, 0, 7), - ) - - def header(self): - header = super().header() - - header["data_id"]["subband_index"] = self.subband_index - header["data_id"]["first_baseline"] = self.first_baseline - - return header - - def payload(self, signed=True): - return super().payload(signed=True) - - -class BSTPacket(StatisticsPacket): - """Models an BST statistics UDP packet from SDP. - - The following fields are exposed as properties & functions. - - beamlet_index: - the number of the beamlet for which this packet holds statistics. - """ - - def __init__(self, packet: bytes): - self.beamlet_index = None - - super().__init__(packet) - - def __str__(self): - first_beamlet = self.beamlet_index - last_beamlet = first_beamlet + (self.nof_statistics_per_packet // N_POL) - 1 - return ( - f"BSTPacket(" - f"station={self.station_id}, field={self.antenna_field_name()}, " - f"gn_index={self.gn_index}, " - f"timestamp={self.timestamp().strftime('%FT%T')}, " - f"payload_error={self.payload_error}, " - f"beamlets={first_beamlet} - {last_beamlet})" - ) - - def unpack_data_id(self): - super().unpack_data_id() - - self.beamlet_index = get_bit_value(self.data_id, 0, 15) - - def header(self): - header = super().header() - - header["data_id"]["beamlet_index"] = self.beamlet_index - - return header - - def payload(self, signed=True): - # We have signed values, per beamlet in pairs - # for each polarisation. - return super().payload(signed=True).reshape(-1, N_POL) - - -# Which class to use for which marker. -# -# NB: Python does not allow us to register from inside the class, -# as we cannot reference the class during its construction. -PACKET_CLASS_FOR_MARKER = { - b"b": BeamletPacket, - b"S": SSTPacket, - b"B": BSTPacket, - b"X": XSTPacket, -} diff --git a/lofar_station_client/statistics/packets/__init__.py b/lofar_station_client/statistics/packets/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..9222ac37cb42595022b1cdeffcebff89594c1849 --- /dev/null +++ b/lofar_station_client/statistics/packets/__init__.py @@ -0,0 +1,23 @@ +# Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy) +# SPDX-License-Identifier: Apache-2.0 + +"""All types of packets as parsed by different collectors""" + +from ._beamlet import BeamletHeader, BeamletPacket +from ._statistics import ( + BSTPacket, + XSTPacket, + StatisticsHeader, + StatisticsPacket, + SSTPacket, +) + +__all__ = [ + "BSTPacket", + "XSTPacket", + "SSTPacket", + "StatisticsHeader", + "StatisticsPacket", + "BeamletPacket", + "BeamletHeader", +] diff --git a/lofar_station_client/statistics/packets/_base.py b/lofar_station_client/statistics/packets/_base.py new file mode 100644 index 0000000000000000000000000000000000000000..1b6cf4e429289dc85aa388d634a521ae2cf535da --- /dev/null +++ b/lofar_station_client/statistics/packets/_base.py @@ -0,0 +1,282 @@ +# Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy) +# SPDX-License-Identifier: Apache-2.0 + +# pylint: disable=too-many-instance-attributes + +""" Base classes for package definitions """ + +from abc import abstractmethod +from datetime import datetime, timezone +from typing import TypeVar + +from ._utils import get_bit_value + +N_POL = 2 +N_COMPLEX = 2 + +Subfields = TypeVar("Subfields") + + +class _BasePacketHeader: + """Models a UDP packet from SDP. + + Packets are expected to be UDP payload only (so no Ethernet/IP/UDP headers). + + The following fields are exposed as properties & functions. The _raw fields come + directly from the packet, and have more user-friendly alternatives for + interpretation: + + marker_raw + + packet marker as byte. + + marker + + packet marker as character. 'S' = SST, 'X' = XST, 'B' = BST + + version_id + + packet format version. + + observation_id + + observation identifier. + + station_info + + bit field with station information, encoding several other properties. + + station_id + + station identifier. + + antenna_field_index + + antenna field id. 0 = LBA/HBA/HBA0, 1 = HBA1 + + source_info + + bit field with input information, encoding several other properties. + + antenna_band_index + + antenna type. 0 = low band, 1 = high band. + + nyquist_zone_index + + nyquist zone of filter + + - 0 = 0 -- 1/2 * t_adc Hz (low band), + - 1 = 1/2 * t_adc -- t_adc Hz (high band), + - 2 = t_adc -- 3/2 * t_adc Hz (high band). + + t_adc + + sampling clock. 0 = 160 MHz, 1 = 200 MHz. + + fsub_type + + sampling method. 0 = critically sampled, 1 = oversampled. + + payload_error + + 0 = data is ok, 1 = data is corrupted (a fault was encountered). + + beam_repositioning_flag + + 0 = data is ok, 1 = beam got repositioned during packet construction + (BST only). + + gn_index + + global index of FPGA that emitted this packet. + + block_period_raw + + block period, in nanoseconds. + + block_period + + block period, in seconds. + + block_serial_number + + timestamp of the data, in block periods since 1970. + + timestamp + + timestamp of the data, as a datetime object. + + """ + + def __init__(self, packet: bytes): + self.version_id: int = 0 + self.block_serial_number: int = 0 + self.block_period_raw: int = 0 + self.antenna_field_index: int = 0 + self.station_id: int = 0 + self.source_info: int = 0 + self.antenna_band_index: int = 0 + self.nyquist_zone_index: int = 0 + self.t_adc: int = 0 + self.fsub_type: int = 0 + self.payload_error: int = 0 + self.beam_repositioning_flag: int = 0 + # self.source_info 5-8 are reserved + self.gn_index: int = 0 + self.nof_statistics_per_packet: int = 0 + self.nof_bytes_per_statistic: int = 0 + self.marker_raw: bytes = bytes() + self.station_info: int = 0 + self.observation_id: int = 0 + + self.packet = packet + + self.unpack() + self.unpack_station_info() + self.unpack_source_info() + + # Only parse valid statistics packets from SDP, reject everything else + if self.marker_raw not in self.valid_markers(): + raise ValueError( + f"Invalid packet of type {self.__class__.__name__}: packet marker" + f"(first byte) is {self.marker}, " + f"not one of {self.valid_markers()}." + ) + + def __str__(self): + return ( + f"_BasePacketHeader(marker={self.marker}, " + f"station={self.station_id}, field={self.antenna_field_name}, " + f"gn_index={self.gn_index}, " + f"timestamp={self.timestamp.strftime('%FT%T')}, " + f"payload_error={self.payload_error})" + ) + + @property + def raw(self) -> bytes: + """Returns the raw packet header bytes""" + return self.packet + + @abstractmethod + def unpack(self): + """Unpack the packet bytes struct""" + + @abstractmethod + def valid_markers(self): + """Returns the valid packet markers""" + + def unpack_station_info(self): + """Unpack the station_info field into properties of this object.""" + + self.station_id = get_bit_value(self.station_info, 0, 9) + self.antenna_field_index = get_bit_value(self.station_info, 10, 15) + + def unpack_source_info(self): + """Unpack the source_info field into properties of this object.""" + + self.antenna_band_index = get_bit_value(self.source_info, 15) + self.nyquist_zone_index = get_bit_value(self.source_info, 13, 14) + self.t_adc = get_bit_value(self.source_info, 12) + self.fsub_type = get_bit_value(self.source_info, 11) + self.payload_error = get_bit_value(self.source_info, 10) != 0 + self.beam_repositioning_flag = get_bit_value(self.source_info, 9) != 0 + # self.source_info 5-8 are reserved + self.gn_index = get_bit_value(self.source_info, 0, 4) + + @abstractmethod + def expected_size(self) -> int: + """Determine packet size (header + payload) from the header""" + + # the generic header does not contain enough information to determine the + # payload size + raise NotImplementedError + + def size(self) -> int: + """The actual size of this packet.""" + + return len(self.packet) + + @property + def marker(self) -> str: + """Return the type of statistic: + + - 'S' = SST + - 'B' = BST + - 'X' = XST + - 'b' = beamlet + """ + + try: + return self.marker_raw.decode("ascii") + except UnicodeDecodeError: + # non-ascii (>127) character, return as binary + # + # this is typically not visible to the user, as these packets are not SDP + # statistics packets, which the constructor will refuse to accept. + return str() + + @property + def antenna_field_name(self) -> str: + """Returns the name of the antenna field as one of LBA-#0, HBA-#0, HBA-#1.""" + + antenna_band = ["LBA", "HBA"][self.antenna_band_index] + + # NB: This returns HBA-#0 for both HBA0 and HBA + return f"{antenna_band}-#{self.antenna_field_index}" + + @property + def block_period(self) -> float: + """Return the block period, in seconds.""" + + return self.block_period_raw / 1e9 + + @property + def timestamp(self) -> datetime: + """Returns the timestamp of the data in this packet. + + :return: datetime from block_serial_number and block period but + datetime.min if the block_serial_number in the packet is not set + datetime.max if the timestamp cannot be represented in python + """ + + try: + return datetime.fromtimestamp( + self.block_serial_number * self.block_period, timezone.utc + ) + except ValueError: + # Let's not barf anytime we want to print a header + return datetime.max + + def __iter__(self): + """Return all the header fields as a dict.""" + yield "marker", self.marker + yield "marker_raw", self.marker_raw[0] + yield "version_id", self.version_id + yield "observation_id", self.observation_id + yield "station_id", self.station_id + yield "station_info", { + "_raw": self.station_info, + "station_id": self.station_id, + "antenna_field_index": self.antenna_field_index, + } + + yield "source_info", { + "_raw": self.source_info, + "antenna_band_index": self.antenna_band_index, + "nyquist_zone_index": self.nyquist_zone_index, + "t_adc": self.t_adc, + "fsub_type": self.fsub_type, + "payload_error": self.payload_error, + "beam_repositioning_flag": self.beam_repositioning_flag, + "gn_index": self.gn_index, + } + yield "block_period_raw", self.block_period_raw + yield "block_period", self.block_period + yield "block_serial_number", self.block_serial_number + yield "timestamp", self.timestamp + + if self.t_adc == 0: + yield "f_adc", 160 + elif self.t_adc == 1: + yield "f_adc", 200 diff --git a/lofar_station_client/statistics/packets/_beamlet.py b/lofar_station_client/statistics/packets/_beamlet.py new file mode 100644 index 0000000000000000000000000000000000000000..cf0e3ed555a5ab4f13c19757c75f6daa7a54cba9 --- /dev/null +++ b/lofar_station_client/statistics/packets/_beamlet.py @@ -0,0 +1,224 @@ +# Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy) +# SPDX-License-Identifier: Apache-2.0 + +# pylint: disable=too-many-instance-attributes,too-many-function-args,duplicate-code + +""" Beamlet related package definitions """ + +import struct + +import numpy + +from ._base import _BasePacketHeader +from ._utils import get_bit_value + +N_POL = 2 +N_COMPLEX = 2 + + +class BeamletHeader(_BasePacketHeader): + """Models a beamlet UDP packet from SDP. + + Packets are expected to be UDP payload only (so no Ethernet/IP/UDP headers). + + The following additional fields are exposed as properties & functions. + + beamlet_width: + + bits/beamlet (4, 8, 16). + + beamlet_scale: + + scaling of beamlets prior to quantisation. + + beamlet_index: + + index of the first beamlet in the payload. + + nof_blocks_per_packet: + + number of blocks (timestamps) of data in the payload. + + nof_beamlets_per_block: + + number of beamlets in the payload. + """ + + # format string for the header, see unpack below + HEADER_FORMAT = ">cBL HBH xxxxx HHBHHQ" + HEADER_SIZE = struct.calcsize(HEADER_FORMAT) + + def __init__(self, packet: bytes): + self.beamlet_width: int = 0 + self.beamlet_scale: int = 0 + self.beamlet_index: int = 0 + self.nof_blocks_per_packet: int = 0 + self.nof_beamlets_per_block: int = 0 + super().__init__(packet) + + def __str__(self): + return ( + f"BeamletHeader(" + f"station={self.station_id}, field={self.antenna_field_name}, " + f"gn_index={self.gn_index}, " + f"timestamp={self.timestamp.strftime('%FT%T')}, " + f"payload_error={self.payload_error})" + ) + + def unpack(self): + """Unpack the packet into properties of this object.""" + + # unpack fields + try: + ( + self.marker_raw, + self.version_id, + self.observation_id, + self.station_info, + source_info_h, + source_info_l, + self.beamlet_scale, + self.beamlet_index, + self.nof_blocks_per_packet, + self.nof_beamlets_per_block, + self.block_period_raw, + self.block_serial_number, + ) = struct.unpack(self.HEADER_FORMAT, self.packet[: self.HEADER_SIZE]) + except struct.error as ex: + raise ValueError("Error parsing beamlet packet") from ex + + self.source_info = (source_info_h << 16) + source_info_l + # unpack the fields we just updated + self.unpack_station_info() + self.unpack_source_info() + + def valid_markers(self): + return [b"b"] + + def unpack_source_info(self): + """Unpack the source_info field into properties of this object.""" + + self.beamlet_width = get_bit_value(self.source_info, 8, 11) or 16 + + def expected_size(self) -> int: + """The size this packet should be (header + payload), extracted from header.""" + + return ( + self.HEADER_SIZE + + self.nof_statistics_per_packet * self.nof_bytes_per_statistic + ) + + @property + def nof_statistics_per_packet(self) -> int: + """Compute number of statistics per packet.""" + + return ( + self.nof_blocks_per_packet + * self.nof_beamlets_per_block + * N_POL + * N_COMPLEX + * self.beamlet_width + // 8 + ) + + @property + def nof_bytes_per_statistic(self) -> int: + """Determine number of bytes per statistic.""" + + if self.beamlet_width == 8: + # cint8 data [-127, 127] + return 1 + if self.beamlet_width == 16: + # cint16 data [-32767, 32767] + return 2 + if self.beamlet_width == 4: + # cint4 data [-7, 7], packet in 1 byte + return 1 + + return -1 + + def __iter__(self): + """Return all the header fields as a dict.""" + for key, value in super().__iter__(): + if key == "source_info": + yield key, {**value, "beamlet_width": self.beamlet_width} + else: + yield key, value + yield "beamlet_scale", self.beamlet_scale + yield "beamlet_index", self.beamlet_index + yield "nof_blocks_per_packet", self.nof_blocks_per_packet + yield "nof_beamlets_per_block", self.nof_beamlets_per_block + + +class BeamletPacket: + """Models a beamlet UDP packet from SDP. + + Packets are expected to be UDP payload only (so no Ethernet/IP/UDP headers). + + """ + + def __init__(self, header: BeamletHeader, payload: bytes): + self._header: BeamletHeader = header + self.payload_data: bytes = payload + + @staticmethod + def parse_packet(packet_data: bytes) -> "BeamletPacket": + """Parses bytes into a BeamletPacket""" + header = BeamletHeader(packet_data[0 : BeamletHeader.HEADER_SIZE]) + return BeamletPacket(header, packet_data[BeamletHeader.HEADER_SIZE :]) + + @property + def raw(self) -> bytes: + """Returns the raw packet bytes""" + return self._header.raw + self.payload_data + + @property + def header(self) -> BeamletHeader: + """Returns the packet header""" + return self._header + + def payload_raw(self): + """Returns the payload as numpy array""" + bytecount_to_unsigned_struct_type = {1: "b", 2: "h", 4: "i", 8: "q"} + format_str = ( + f">{self.header.nof_statistics_per_packet}" + f"{bytecount_to_unsigned_struct_type[self.header.nof_bytes_per_statistic]}" + ) + + payload = numpy.array( + struct.unpack( + format_str, + self.payload_data, + ) + ) + + # Generate payload as linear array + if self.header.beamlet_width == 4: + payload.reshape( + self.header.nof_blocks_per_packet, + self.header.nof_beamlets_per_block, + N_POL, + ) + + return ( + payload + # TODO(Corne): Verify reshape can actually take > 2 arguments + .reshape( + self.header.nof_blocks_per_packet, + self.header.nof_beamlets_per_block, + N_POL, + N_COMPLEX, + ) + ) + + def payload(self): + """A property that should probably be removed""" + + if self.header.beamlet_width == 4: + # real in low 4 bits, imag in high 4 bits (both signed!) + payload_raw = self.payload_raw() + + # shift extends sign, so we prefer it over bitwise and + return (payload_raw << 4 >> 4) + (payload_raw >> 4) * 1j + + return self.payload_raw().astype(float).view(numpy.complex64) diff --git a/lofar_station_client/statistics/packets/_statistics.py b/lofar_station_client/statistics/packets/_statistics.py new file mode 100644 index 0000000000000000000000000000000000000000..d5ccba88430b79f8e541eb34c0ff7b615a8f247d --- /dev/null +++ b/lofar_station_client/statistics/packets/_statistics.py @@ -0,0 +1,317 @@ +# Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy) +# SPDX-License-Identifier: Apache-2.0 + +# pylint: disable=too-many-instance-attributes,duplicate-code + +""" Statistics related package definitions """ + +import struct +from datetime import datetime +from typing import TypeVar, Generic + +import numpy + +from ._base import _BasePacketHeader +from ._utils import get_bit_value + +N_POL = 2 +N_COMPLEX = 2 + +Subfields = TypeVar("Subfields") + + +class StatisticsHeader(Generic[Subfields], _BasePacketHeader): + """Models a UDP packet from SDP. + + Packets are expected to be UDP payload only (so no Ethernet/IP/UDP headers). + + The following additional fields are exposed as properties & functions. + + subband_calibrated_flag 1 = subband data had subband calibration values applied, + 0 = not. + + data_id bit field with payload information, encoding several + other properties. + + nof_signal_inputs number of inputs that contributed to data in this packet. + nof_bytes_per_statistics word size of each statistic. + nof_statistics_per_packet number of statistic data points in the payload. + + integration_interval_raw integration interval, in block periods. + integration_interval integration interval, in seconds. + + """ + + def __init__(self, packet: bytes): + self.subband_calibrated_flag: int = 0 + self.nof_signal_inputs: int = 0 + self.nof_statistics_per_packet: int = 0 + self.nof_bytes_per_statistic: int = 0 + self.data_id: Subfields = None + self.data_id_raw: int = 0 + self.integration_interval_raw: int = 0 + + super().__init__(packet) + + # format string for the header, see unpack below + HEADER_FORMAT = ">cBL HH xBH LB BHHQ" + HEADER_SIZE = struct.calcsize(HEADER_FORMAT) + + @property + def raw(self) -> bytes: + return self.packet + + def unpack(self): + """Unpack the packet into properties of this object.""" + + # unpack fields + try: + ( + self.marker_raw, + self.version_id, + self.observation_id, + self.station_info, + self.source_info, + integration_interval_hi, + integration_interval_lo, + self.data_id_raw, + self.nof_signal_inputs, + self.nof_bytes_per_statistic, + self.nof_statistics_per_packet, + self.block_period_raw, + self.block_serial_number, + ) = struct.unpack(self.HEADER_FORMAT, self.packet[: self.HEADER_SIZE]) + except struct.error as ex: + raise ValueError("Error parsing statistics packet") from ex + + self.integration_interval_raw = ( + integration_interval_hi << 16 + ) + integration_interval_lo + # unpack the fields we just updated + + def valid_markers(self): + return [b"S", b"B", b"X"] + + def unpack_source_info(self): + """Unpack the source_info field into properties of this object.""" + super().unpack_source_info() + self.subband_calibrated_flag = get_bit_value(self.source_info, 8) != 0 + + def expected_size(self) -> int: + """Determine packet size (header + payload) from the header""" + + return ( + self.HEADER_SIZE + + self.nof_statistics_per_packet * self.nof_bytes_per_statistic + ) + + @property + def integration_interval(self) -> float: + """Returns the integration interval, in seconds.""" + + # Translate to seconds using the block period + return self.integration_interval_raw * self.block_period + + def __iter__(self): + """Return all the header fields as a dict.""" + for key, value in super().__iter__(): + if key == "source_info": + yield key, { + **value, + "subband_calibrated_flag": self.subband_calibrated_flag, + } + else: + yield key, value + yield "data_id", {**dict(self.data_id), "_raw": self.data_id_raw} + yield "integration_interval_raw", self.integration_interval_raw + yield "integration_interval", self.integration_interval + yield "nof_signal_inputs", self.nof_signal_inputs + yield "nof_bytes_per_statistic", self.nof_bytes_per_statistic + yield "nof_statistics_per_packet", self.nof_statistics_per_packet + + +class StatisticsPacket: + """Models a statistics UDP packet from SDP. + + Packets are expected to be UDP payload only (so no Ethernet/IP/UDP headers). + """ + + def __init__(self, header: StatisticsHeader, payload: bytes): + self._header: StatisticsHeader = header + self.payload_data: bytes = payload + + def __new__(cls, header: StatisticsHeader, packet_data: bytes): + if cls == StatisticsPacket: + return PACKET_CLASS_FOR_MARKER[header.marker_raw](header, packet_data) + + return super().__new__(cls) + + def __iter__(self): + yield from self.header + yield "payload", self.payload() + + @staticmethod + def parse_packet(packet_data: bytes) -> "StatisticsPacket": + """Parses bytes into the corresponding StatisticsPacket""" + header = StatisticsHeader(packet_data[0 : StatisticsHeader.HEADER_SIZE]) + return StatisticsPacket(header, packet_data[StatisticsHeader.HEADER_SIZE :]) + + @property + def raw(self) -> bytes: + """Returns the raw packet bytes""" + return self._header.raw + self.payload_data + + @property + def header(self) -> StatisticsHeader: + """Returns the packet header""" + return self._header + + @property + def timestamp(self) -> datetime: + """Returns the packet timestamp""" + return self.header.timestamp + + def payload(self, signed=False) -> numpy.array: + """The payload of this packet, as a linear array.""" + + # derive which and how many elements to read from the packet header + bytecount_to_unsigned_struct_type = ( + {1: "b", 2: "h", 4: "i", 8: "q"} + if signed + else {1: "B", 2: "H", 4: "I", 8: "Q"} + ) + format_str = ( + f">{self.header.nof_statistics_per_packet}" + f"{bytecount_to_unsigned_struct_type[self.header.nof_bytes_per_statistic]}" + ) + + return numpy.array( + struct.unpack( + format_str, + self.payload_data, + ) + ) + + +class SSTPacket(StatisticsPacket): + """Models an SST statistics UDP packet from SDP. + + The following fields are exposed as properties & functions. + + signal_input_index: + input (antenna polarisation) index for which this packet contains statistics + + payload[nof_statistics_per_packet]: + SST statistics, an array of amplitudes per subband. + """ + + # pylint: disable=too-few-public-methods + class SSTHeaderSubFields: + """SST packet header subfields""" + + def __init__(self, data_id): + self.signal_input_index = get_bit_value(data_id, 0, 7) + + def __iter__(self): + yield "signal_input_index", self.signal_input_index + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._header.data_id = SSTPacket.SSTHeaderSubFields(self._header.data_id_raw) + + @property + def header(self) -> StatisticsHeader[SSTHeaderSubFields]: + """Returns the packet header""" + return super().header + + def payload(self, signed=True): + """Returns the packet payload""" + return super().payload(signed=False) + + +class XSTPacket(StatisticsPacket): + """Models an XST statistics UDP packet from SDP. + + The following fields are exposed as properties & functions. + + subband_index: + subband number for which this packet contains statistics. + first_baseline: + first antenna pair for which this packet contains statistics. + + payload[nof_signal_inputs][nof_signal_inputs] the baselines, starting from + first_baseline + """ + + # pylint: disable=too-few-public-methods + class XstHeaderSubFields: + """XST packet header subfields""" + + def __init__(self, data_id): + self.subband_index = get_bit_value(data_id, 16, 24) + self.first_baseline = ( + get_bit_value(data_id, 8, 15), + get_bit_value(data_id, 0, 7), + ) + + def __iter__(self): + yield "subband_index", self.subband_index + yield "first_baseline", self.first_baseline + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._header.data_id = XSTPacket.XstHeaderSubFields(self._header.data_id_raw) + + @property + def header(self) -> StatisticsHeader[XstHeaderSubFields]: + """Returns the packet header""" + return super().header + + def payload(self, signed=True): + """Returns the packet payload""" + return super().payload(signed=True) + + +class BSTPacket(StatisticsPacket): + """Models an BST statistics UDP packet from SDP. + + The following fields are exposed as properties & functions. + + beamlet_index: + the number of the beamlet for which this packet holds statistics. + """ + + # pylint: disable=too-few-public-methods + class BstHeaderSubFields: + """BST packet header subfields""" + + def __init__(self, data_id): + self.beamlet_index = get_bit_value(data_id, 0, 15) + + def __iter__(self): + yield "beamlet_index", self.beamlet_index + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._header.data_id = BSTPacket.BstHeaderSubFields(self._header.data_id_raw) + + @property + def header(self) -> StatisticsHeader[BstHeaderSubFields]: + """Returns the packet header""" + return super().header + + def payload(self, signed=True): + # We have signed values, per beamlet in pairs + # for each polarisation. + return super().payload(signed=True).reshape(-1, N_POL) + + +# Which class to use for which marker. +# +# NB: Python does not allow us to register from inside the class, +# as we cannot reference the class during its construction. +PACKET_CLASS_FOR_MARKER = { + b"S": SSTPacket, + b"B": BSTPacket, + b"X": XSTPacket, +} diff --git a/lofar_station_client/statistics/packets/_utils.py b/lofar_station_client/statistics/packets/_utils.py new file mode 100644 index 0000000000000000000000000000000000000000..c3fa4ea04abec060957000c7176cbd49faade09d --- /dev/null +++ b/lofar_station_client/statistics/packets/_utils.py @@ -0,0 +1,20 @@ +# Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy) +# SPDX-License-Identifier: Apache-2.0 + +""" Module wide utils """ + + +def get_bit_value(value: int, first_bit: int, last_bit: int = None) -> int: + """Return bits [first_bit:last_bit] from value as integer. Bit 0 = LSB. + + For example, extracting bits 2-3 from b'01100' returns 11 binary = 3 decimal: + get_bit_value(b'01100', 2, 3) == 3 + + If 'last_bit' is not given, just the value of bit 'first_bit' is returned. + """ + + # default last_bit to first_bit + if last_bit is None: + last_bit = first_bit + + return value >> first_bit & ((1 << (last_bit - first_bit + 1)) - 1) diff --git a/lofar_station_client/statistics/receiver.py b/lofar_station_client/statistics/receiver.py index 7869ec1e671a2dafa4626a201e1c71f7c31168a9..3db901ec05e41d75355b15fdf9cd3f3ea29f5d11 100644 --- a/lofar_station_client/statistics/receiver.py +++ b/lofar_station_client/statistics/receiver.py @@ -5,8 +5,9 @@ import os import socket +from urllib.parse import urlparse -from lofar_station_client.statistics.packet import StatisticsPacket +from lofar_station_client.statistics.packets import StatisticsPacket, StatisticsHeader class Receiver: @@ -32,20 +33,15 @@ class Receiver: """Read exactly one statistics packet from the TCP connection.""" # read only the header, to compute the size of the packet - header = self.read_data(self.HEADER_LENGTH) - packet = StatisticsPacket(header) + header_data = self.read_data(self.HEADER_LENGTH) + header = StatisticsHeader(header_data) # read the rest of the packet (payload) - payload_length = packet.expected_size() - len(header) + payload_length = header.expected_size() - len(header_data) payload = self.read_data(payload_length) - # add payload to the header, and construct the full packet - packet = StatisticsPacket(header + payload) - - # return a more specialised class based on its type - packet = packet.downcast() - - return packet + # add payload to the header, and return the full packet + return StatisticsPacket(header, payload) def _read(self, length: int) -> bytes: """Low-level read function to fetch at most "length" (>1) bytes. Returns @@ -113,3 +109,13 @@ class FileReceiver(Receiver): def __del__(self): os.close(self.fileno) + + +def create(uri): + """Create a Receiver based on the given URI""" + parsed = urlparse(uri) + if parsed.scheme == "tcp": + return TCPReceiver(parsed.hostname, parsed.port) + if parsed.scheme == "file": + return FileReceiver(parsed.path) + raise ValueError(f"Provided uri '{uri}' is not supported") diff --git a/lofar_station_client/statistics/writer/entry.py b/lofar_station_client/statistics/writer/entry.py index 3a198d62f45b1031afccfef2e2757a2e8417f5f1..0f69f10eb67b78805ea5dafbaf1c50139bf7bd7a 100644 --- a/lofar_station_client/statistics/writer/entry.py +++ b/lofar_station_client/statistics/writer/entry.py @@ -11,8 +11,8 @@ import argparse import logging import sys import time -from typing import Dict, NamedTuple from pathlib import Path +from typing import Dict, NamedTuple from tango import DeviceProxy @@ -226,7 +226,7 @@ def _start_loop(receiver, writer, reconnect, filename): try: while True: for packet in receiver: - writer.next_packet(packet.packet) + writer.next_packet(packet) if filename or not reconnect: break diff --git a/lofar_station_client/statistics/writer/hdf5.py b/lofar_station_client/statistics/writer/hdf5.py index f33489e84c1c0fd89cfec5f0e4efd350022d340d..3d6caf46e9896819fe66aefe8b0ea6a27c8a653a 100644 --- a/lofar_station_client/statistics/writer/hdf5.py +++ b/lofar_station_client/statistics/writer/hdf5.py @@ -17,23 +17,26 @@ from typing import TypeVar, List, Dict import h5py import numpy import pytz -from tango import DeviceProxy, DevFailed from lofar_station_client import __version__ as lsc_version +from lofar_station_client._utils import sys_exit_no_tango from lofar_station_client.dts.constants import A_pn, N_pol -from lofar_station_client.file_access.hdf._hdf_writers import HdfFileWriter, create_hdf5 +from lofar_station_client.file_access import FileWriter, create_hdf5 from lofar_station_client.statistics import writer as stats_writer from lofar_station_client.statistics.collector import BSTCollector from lofar_station_client.statistics.collector import SSTCollector from lofar_station_client.statistics.collector import XSTCollector -from lofar_station_client.statistics.packet import BSTPacket -from lofar_station_client.statistics.packet import SSTPacket -from lofar_station_client.statistics.packet import XSTPacket +from lofar_station_client.statistics.packets import StatisticsPacket from lofar_station_client.statistics.statistics_data import ( StatisticsDataFile, StatisticsData, ) +try: + from tango import DeviceProxy, DevFailed +except ImportError: + sys_exit_no_tango() + logger = logging.getLogger("statistics_writer") T = TypeVar("T") @@ -80,13 +83,13 @@ def _get_client_version() -> str: return lsc_version or "" -def _dict_to_hdf5_attrs(value_dict: dict) -> dict: +def _dict_to_hdf5_attrs(value_dict) -> dict: """Convert a dictionary of values into a dictionary that cna be merged with h5py's "attr" property.""" attrs = {} - for k, val in value_dict.items(): + for k, val in dict(value_dict).items(): if isinstance(val, dict): # flatten any hierarchical structure for subk, subv in _dict_to_hdf5_attrs(val).items(): @@ -149,7 +152,7 @@ class HDF5Writer(ABC): ): # all variables that deal with the matrix that's currently being decoded self.file: StatisticsDataFile = None - self.file_writer: HdfFileWriter[StatisticsDataFile] = None + self.file_writer: FileWriter[StatisticsDataFile] = None self.current_collector = None self.current_timestamp = datetime.min.replace(tzinfo=pytz.UTC) @@ -255,15 +258,15 @@ class HDF5Writer(ABC): except (DevFailed, AttributeError): logger.exception("Failed to read from %s", self.sdp_device.name()) - @abstractmethod def decoder(self, packet): - """Abstract method""" + """Decodes bytes into a StatisticsPacket""" + return StatisticsPacket.parse_packet(packet) @abstractmethod def new_collector(self): """Abstract method""" - def next_packet(self, packet): + def next_packet(self, statistics_packet: StatisticsPacket): """ All statistics packets come with a timestamp of the time they were @@ -277,18 +280,15 @@ class HDF5Writer(ABC): close the current matrix, store it and start a new one. """ - # process the packet - statistics_packet = self.decoder(packet) - if not self.statistics_packet_header: - self.statistics_packet_header = statistics_packet.header() + self.statistics_packet_header = statistics_packet.header # grab the timestamp - statistics_timestamp = statistics_packet.timestamp() + statistics_timestamp = statistics_packet.timestamp # ignore packets with no timestamp, as they indicate FPGA processing was # disabled and are useless anyway. - if statistics_packet.block_serial_number == 0: + if statistics_packet.header.block_serial_number == 0: logger.warning("Received statistics with no timestamp. Packet dropped.") return @@ -308,7 +308,7 @@ class HDF5Writer(ABC): self.start_new_matrix(statistics_timestamp) self.current_timestamp = statistics_timestamp - self.process_packet(packet) + self.process_packet(statistics_packet) def start_new_matrix(self, timestamp): """ @@ -472,7 +472,7 @@ class HDF5Writer(ABC): f"{self.file_location}/{self.mode}_{time_str}_{self.antennafield}{suffix}" ) - def process_packet(self, packet): + def process_packet(self, packet: StatisticsPacket): """ Adds the newly received statistics packet to the statistics matrix """ @@ -554,9 +554,6 @@ class SstHdf5Writer(HDF5Writer): self.nr_signal_inputs = nr_signal_inputs self.first_signal_input_index = first_signal_input_index - def decoder(self, packet): - return SSTPacket(packet) - def new_collector(self): return SSTCollector(self.nr_signal_inputs, self.first_signal_input_index) @@ -598,9 +595,6 @@ class BstHdf5Writer(HDF5Writer): devices=devices, ) - def decoder(self, packet): - return BSTPacket(packet) - def new_collector(self): return BSTCollector() @@ -655,9 +649,6 @@ class XstHdf5Writer(HDF5Writer): self.nr_signal_inputs = nr_signal_inputs self.first_signal_input_index = first_signal_input_index - def decoder(self, packet): - return XSTPacket(packet) - def new_collector(self): return XSTCollector(self.nr_signal_inputs, self.first_signal_input_index, 1) @@ -725,18 +716,16 @@ class ParallelXstHdf5Writer: self.new_writer = new_writer - def next_packet(self, packet): + def next_packet(self, statistics_packet: StatisticsPacket): """Gets next packet""" - # decode to get subband of this packet - fields = XSTPacket(packet) - subband = fields.subband_index + subband = statistics_packet.header.data_id.subband_index # make sure there is a writer for it if subband not in self.writers: self.writers[subband] = self.new_writer(subband) # demux packet to the correct writer - self.writers[subband].next_packet(packet) + self.writers[subband].next_packet(statistics_packet) def close_writer(self): """Closes writer""" diff --git a/requirements-tango.txt b/requirements-tango.txt new file mode 100644 index 0000000000000000000000000000000000000000..3b2e762f751fcc925629a2c1985d5a453d95d51c --- /dev/null +++ b/requirements-tango.txt @@ -0,0 +1 @@ +PyTango>=9.4.0 # LGPL v3 diff --git a/requirements.txt b/requirements.txt index ffbe2f6b2178d1e4257ee91b89a05187fa8a0df4..5e84983170cff2ff5b1a7a7025fa3d6680c58cb3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,6 @@ # Order does matter importlib-metadata>=0.12, <5.0;python_version<"3.8" pip>=1.5 -PyTango>=9.4.0 # LGPL v3 requests>=2.0 # Apache 2 numpy>=1.21.0 # BSD nptyping>=2.3.0 # MIT diff --git a/setup.cfg b/setup.cfg index f9512d75824240229d9eccfaa324ff6ae49ad0d5..51a18ca4562af1bcca34e79a087d56300e0f88c4 100644 --- a/setup.cfg +++ b/setup.cfg @@ -24,6 +24,7 @@ classifiers = Programming Language :: Python :: 3.9 Programming Language :: Python :: 3.10 Programming Language :: Python :: 3.11 + Programming Language :: Python :: 3.12 Topic :: Scientific/Engineering Topic :: Scientific/Engineering :: Astronomy @@ -33,6 +34,9 @@ packages = find: python_requires = >=3.7 install_requires = file: requirements.txt +[options.extras_require] +tango = file: requirements-tango.txt + [options.entry_points] console_scripts = l2ss-statistics-reader = lofar_station_client.statistics.reader:main diff --git a/setup.py b/setup.py index b908cbe55cb344569d32de1dfc10ca7323828dc5..871aeecda8196d62e2577e0c9350c198f521e98b 100644 --- a/setup.py +++ b/setup.py @@ -1,3 +1,6 @@ +# Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy) +# SPDX-License-Identifier: Apache-2.0 + import setuptools setuptools.setup() diff --git a/tests/statistics/test_collector.py b/tests/statistics/test_collector.py index 8a53f89b7cfdef9bbfa4e95ce419f0c7948d63cf..6873d9e5774c2085c09359434868e0f4f682ad1d 100644 --- a/tests/statistics/test_collector.py +++ b/tests/statistics/test_collector.py @@ -5,8 +5,7 @@ import numpy from lofar_station_client.statistics.collector import BSTCollector from lofar_station_client.statistics.collector import XSTCollector -from lofar_station_client.statistics.packet import BSTPacket -from lofar_station_client.statistics.packet import XSTPacket +from lofar_station_client.statistics.packets import StatisticsPacket from tests import base @@ -58,21 +57,21 @@ class TestXSTCollector(base.TestCase): # a valid packet as obtained from SDP, with 64-bit BE 1+1j as payload at # (12,0) - packet = ( + packet_data = ( b"X\x05\x00\x00\x00\x00\x00\x00\x10\x08\x00\x02\xfa\xef\x00f\x0c\x00\x0c" b"\x08\x01 \x14\x00\x00\x01!\xd9&z\x1b\xb3" + 288 * b"\x00\x00\x00\x00\x00\x00\x00\x01" ) # parse it ourselves to extract info nicely - fields = XSTPacket(packet) - fpga_index = fields.gn_index + packet = StatisticsPacket.parse_packet(packet_data) + fpga_index = packet.header.gn_index # baseline indeed should be (12,0) - self.assertEqual((12, 0), fields.first_baseline) + self.assertEqual((12, 0), packet.header.data_id.first_baseline) # subband should indeed be 102 - self.assertEqual(102, fields.subband_index) + self.assertEqual(102, packet.header.data_id.subband_index) # this should not throw collector.process_packet(packet) @@ -99,14 +98,16 @@ class TestXSTCollector(base.TestCase): continue baseline_a_was_in_packet = ( - fields.first_baseline[0] + packet.header.data_id.first_baseline[0] <= baseline_a - < fields.first_baseline[0] + fields.nof_signal_inputs + < packet.header.data_id.first_baseline[0] + + packet.header.nof_signal_inputs ) baseline_b_was_in_packet = ( - fields.first_baseline[1] + packet.header.data_id.first_baseline[1] <= baseline_b - < fields.first_baseline[1] + fields.nof_signal_inputs + < packet.header.data_id.first_baseline[1] + + packet.header.nof_signal_inputs ) if baseline_a_was_in_packet and baseline_b_was_in_packet: @@ -132,7 +133,7 @@ class TestXSTCollector(base.TestCase): # a valid packet as obtained from SDP. # the first 72 samples are 1+1j, the second 72 samples are 2+2j (64-bit BE). # at baseline (0,12) VV VV - packet = ( + packet_data = ( b"X\x05\x00\x00\x00\x00\x00\x00\x10\x08\x00\x02\xfa\xef\x00f\x00\x0c\x0c" b"\x08\x01 \x14\x00\x00\x01!\xd9&z\x1b\xb3" + 144 * b"\x00\x00\x00\x00\x00\x00\x00\x01" @@ -140,10 +141,10 @@ class TestXSTCollector(base.TestCase): ) # parse it ourselves to extract info nicely - fields = XSTPacket(packet) + packet = StatisticsPacket.parse_packet(packet_data) # baseline indeed should be (0,12) - self.assertEqual((0, 12), fields.first_baseline) + self.assertEqual((0, 12), packet.header.data_id.first_baseline) # this should not throw collector.process_packet(packet) @@ -169,21 +170,23 @@ class TestXSTCollector(base.TestCase): # use swapped indices! baseline_a_was_in_packet = ( - fields.first_baseline[1] + packet.header.data_id.first_baseline[1] <= baseline_a - < fields.first_baseline[1] + fields.nof_signal_inputs + < packet.header.data_id.first_baseline[1] + + packet.header.nof_signal_inputs ) baseline_b_was_in_packet = ( - fields.first_baseline[0] + packet.header.data_id.first_baseline[0] <= baseline_b - < fields.first_baseline[0] + fields.nof_signal_inputs + < packet.header.data_id.first_baseline[0] + + packet.header.nof_signal_inputs ) if baseline_a_was_in_packet and baseline_b_was_in_packet: # through conjugation, the imaginary part is made negative # through transposition, the second dimension (b) now is the divider # between the two distinct values - if baseline_b - fields.first_baseline[0] < 6: + if baseline_b - packet.header.data_id.first_baseline[0] < 6: self.assertEqual( 1 - 1j, xst_values[baseline_a][baseline_b], @@ -218,23 +221,23 @@ class TestXSTCollector(base.TestCase): collector = XSTCollector() # a valid packet as obtained from SDP, with 64-bit BE 1+1j as payload at (12,0) - packet_subband_102 = ( + packet_subband_102_data = ( b"X\x05\x00\x00\x00\x00\x00\x00\x10\x08\x00\x02\xfa\xef\x00f\x0c\x00\x0c" b"\x08\x01 \x14\x00\x00\x01!\xd9&z\x1b\xb3" + 288 * b"\x00\x00\x00\x00\x00\x00\x00\x01" ) - packet_subband_103 = ( + packet_subband_103_data = ( b"X\x05\x00\x00\x00\x00\x00\x00\x10\x08\x00\x02\xfa\xef\x00g\x0c\x00\x0c" b"\x08\x01 \x14\x00\x00\x01!\xd9&z\x1b\xb3" + 288 * b"\x00\x00\x00\x00\x00\x00\x00\x02" ) # make sure the subband_indices are indeed what we claim they are - fields = XSTPacket(packet_subband_102) - self.assertEqual(102, fields.subband_index) + packet_subband_102 = StatisticsPacket.parse_packet(packet_subband_102_data) + self.assertEqual(102, packet_subband_102.header.data_id.subband_index) - fields = XSTPacket(packet_subband_103) - self.assertEqual(103, fields.subband_index) + packet_subband_103 = StatisticsPacket.parse_packet(packet_subband_103_data) + self.assertEqual(103, packet_subband_103.header.data_id.subband_index) # process our packets collector.process_packet(packet_subband_102) @@ -257,14 +260,16 @@ class TestXSTCollector(base.TestCase): continue baseline_a_was_in_packet = ( - fields.first_baseline[0] + packet_subband_103.header.data_id.first_baseline[0] <= baseline_a - < fields.first_baseline[0] + fields.nof_signal_inputs + < packet_subband_103.header.data_id.first_baseline[0] + + packet_subband_103.header.nof_signal_inputs ) baseline_b_was_in_packet = ( - fields.first_baseline[1] + packet_subband_103.header.data_id.first_baseline[1] <= baseline_b - < fields.first_baseline[1] + fields.nof_signal_inputs + < packet_subband_103.header.data_id.first_baseline[1] + + packet_subband_103.header.nof_signal_inputs ) baseline_in_pk = ( @@ -298,12 +303,13 @@ class TestXSTCollector(base.TestCase): # an invalid packet # V - packet = ( + packet_data = ( b"S\x05\x00\x00\x00\x00\x00\x00\x10\x08\x00\x02\xfa\xef\x00f\x00\x00\x0c" b"\x08\x01 \x14\x00\x00\x01!\xd9&z\x1b\xb3" + 288 * b"\x00\x00\x00\x00\x00\x00\x00\x01" ) + packet = StatisticsPacket.parse_packet(packet_data) # this should throw with self.assertRaises(ValueError): collector.process_packet(packet) @@ -324,15 +330,15 @@ class TestXSTCollector(base.TestCase): # a valid packet with a payload error # V - packet = ( + packet_data = ( b"X\x05\x00\x00\x00\x00\x00\x00\x14\x08\x00\x02\xfa\xef\x00f\x00\x00\x0c" b"\x08\x01 \x14\x00\x00\x01!\xd9&z\x1b\xb3" + 288 * b"\x00\x00\x00\x00\x00\x00\x00\x01" ) # parse it ourselves to extract info nicely - fields = XSTPacket(packet) - fpga_index = fields.gn_index + packet = StatisticsPacket.parse_packet(packet_data) + fpga_index = packet.header.gn_index # this should not throw collector.process_packet(packet) @@ -353,18 +359,19 @@ class TestXSTCollector(base.TestCase): # a valid packet as obtained from SDP, with 64-bit BE 1+1j as payload at # (12,12) - packet = ( + packet_data = ( b"X\x05\x00\x00\x00\x00\x00\x00\x10\x08\x00\x02\xfa\xef\x00f\x0c\x0c\x0c" b"\x08\x01 \x14\x00\x00\x01!\xd9&z\x1b\xb3" + 288 * b"\x00\x00\x00\x00\x00\x00\x00\x01" ) # parse it ourselves to extract info nicely - fields = XSTPacket(packet) + packet = StatisticsPacket.parse_packet(packet_data) # verify we constructed a useful packet for this test self.assertEqual( - (first_signal_input, first_signal_input), fields.first_baseline + (first_signal_input, first_signal_input), + packet.header.data_id.first_baseline, ) # this should not throw @@ -404,18 +411,18 @@ class TestBSTCollector(base.TestCase): collector = BSTCollector() # a valid packet as obtained from DTS outside - packet = ( + packet_data = ( b"B\x05\x00\x00\x00\x00\x03\x87\x11\x00\x00\x02\xfa\xf0\x00\x00\x00\x00" b"\x00\x08\x03\xd0\x14\x00\x00\x01%\xa3\xc7\xb1\x9e\x8e" + 1952 * b"\0\0\0\0" ) # parse it ourselves to extract info nicely - fields = BSTPacket(packet) - fpga_index = fields.gn_index + packet = StatisticsPacket.parse_packet(packet_data) + fpga_index = packet.header.gn_index # beamlet_index should be zero - self.assertEqual(0, fields.beamlet_index) + self.assertEqual(0, packet.header.data_id.beamlet_index) # this should not throw collector.process_packet(packet) @@ -445,12 +452,13 @@ class TestBSTCollector(base.TestCase): # an invalid packet # V - packet = ( + packet_data = ( b"S\x05\x00\x00\x00\x00\x03\x87\x11\x00\x00\x02\xfa\xf0\x00\x00\x00\x00" b"\x00\x08\x03\xd0\x14\x00\x00\x01%\xa3\xc7\xb1\x9e\x8e" + 1952 * b"\0\0\0\0" ) + packet = StatisticsPacket.parse_packet(packet_data) # this should throw with self.assertRaises(ValueError): collector.process_packet(packet) @@ -470,15 +478,15 @@ class TestBSTCollector(base.TestCase): collector = BSTCollector() # a valid packet with a payload error - packet = ( + packet_data = ( b"B\x05\x00\x00\x00\x00\x03\x87\x14\x00\x00\x02\xfa\xf0\x00\x00\x00\x00" b"\x00\x08\x03\xd0\x14\x00\x00\x01%\xa3\xc7\xb1\x9e\x8e" + 1952 * b"\0\0\0\0" ) # parse it ourselves to extract info nicely - fields = BSTPacket(packet) - fpga_index = fields.gn_index + packet = StatisticsPacket.parse_packet(packet_data) + fpga_index = packet.header.gn_index # this should not throw collector.process_packet(packet) @@ -496,11 +504,12 @@ class TestBSTCollector(base.TestCase): # An otherwise valid packet that has its beamlet_index set to 4096. This should # throw an error as the max value is 962 * 3 - packet = ( + packet_data = ( b"B\x05\x00\x00\x00\x00\x03\x87\x11\x00\x00\x02\xfa\xf0\x00\x00" b"\x10\x00\x00\x08\x03\xd0\x14\x00\x00\x01%\xa3\xc7\xb1\x9e\x8e" + 1952 * b"\0\0\0\0" ) + packet = StatisticsPacket.parse_packet(packet_data) with self.assertRaises(ValueError): collector.process_packet(packet) diff --git a/tox.ini b/tox.ini index 962e79fc2925721a20a65c69d2078c11a75da5e1..8dd13c8bebbd9db79c5fd9ed2c53a28c78d8da7d 100644 --- a/tox.ini +++ b/tox.ini @@ -1,7 +1,7 @@ [tox] min_version = 4.3.3 # Generative environment list to test all supported Python versions -envlist = black,pep8,pylint,py3{7,8,9,10},docs +envlist = black,pep8,pylint,py3{7,8,9,10,11,12},docs [testenv] usedevelop = True @@ -13,6 +13,7 @@ setenv = PYTHONWARNINGS=default::DeprecationWarning deps = -r{toxinidir}/requirements.txt + -r{toxinidir}/requirements-tango.txt -r{toxinidir}/test-requirements.txt commands_pre = {envpython} --version @@ -59,6 +60,7 @@ allowlist_externals = sh deps = -r{toxinidir}/requirements.txt + -r{toxinidir}/requirements-tango.txt -r{toxinidir}/docs/requirements.txt extras = docs commands =