diff --git a/tangostationcontrol/setup.cfg b/tangostationcontrol/setup.cfg index cfc0a249b75a93c5fc80b4dc892434f779581e8e..a1fe2672594f706a8c58e0a652c8c814c021180c 100644 --- a/tangostationcontrol/setup.cfg +++ b/tangostationcontrol/setup.cfg @@ -53,7 +53,7 @@ console_scripts = l2ss-cold-start = tangostationcontrol.toolkit.lts_cold_start:main l2ss-hardware-device-template = tangostationcontrol.examples.HW_device_template:main l2ss-ini-device = tangostationcontrol.examples.load_from_disk.ini_device:main - l2ss-parse-statistics-packet = tangostationcontrol.devices.sdp.statistics_packet:main + l2ss-parse-sdp-packet = tangostationcontrol.devices.sdp.packet:main l2ss-random-data = tangostationcontrol.test.devices.random_data:main l2ss-snmp = tangostationcontrol.examples.snmp.snmp:main l2ss-version = tangostationcontrol.common.lofar_version:main diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/statistics_packet.py b/tangostationcontrol/tangostationcontrol/devices/sdp/packet.py similarity index 58% rename from tangostationcontrol/tangostationcontrol/devices/sdp/statistics_packet.py rename to tangostationcontrol/tangostationcontrol/devices/sdp/packet.py index 59c74e296c2eebdd677d448bdae523be8d149934..611017659302f0cd0e346e2c3265e002bc52faf8 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/statistics_packet.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/packet.py @@ -2,8 +2,10 @@ import struct from datetime import datetime, timezone import numpy -__all__ = ["StatisticsPacket", "SSTPacket", "XSTPacket", "BSTPacket"] +__all__ = ["StatisticsPacket", "SDPPacket", "SSTPacket", "XSTPacket", "BSTPacket", "read_packet", "PACKET_CLASS_FOR_MARKER"] +N_POL = 2 +N_COMPLEX = 2 def get_bit_value(value: bytes, first_bit: int, last_bit: int = None) -> int: """ Return bits [first_bit:last_bit] from value, and return their integer value. Bit 0 = LSB. @@ -20,9 +22,9 @@ def get_bit_value(value: bytes, first_bit: int, last_bit: int = None) -> int: return value >> first_bit & ((1 << (last_bit - first_bit + 1)) - 1) -class StatisticsPacket(object): +class SDPPacket(object): """ - Models a statistics UDP packet from SDP. + Models a UDP packet from SDP. Packets are expected to be UDP payload only (so no Ethernet/IP/UDP headers). @@ -46,17 +48,8 @@ class StatisticsPacket(object): fsub_type: sampling method. 0 = critically sampled, 1 = oversampled. payload_error: 0 = data is ok, 1 = data is corrupted (a fault was encountered). beam_repositioning_flag: 0 = data is ok, 1 = beam got repositioned during packet construction (BST only). - subband_calibrated_flag: 1 = subband data had subband calibration values applied, 0 = not. gn_index: global index of FPGA that emitted this packet. - data_id: bit field with payload information, encoding several other properties. - - nof_signal_inputs: number of inputs that contributed to data in this packet. - nof_bytes_per_statistics: word size of each statistic. - nof_statistics_per_packet: number of statistic data points in the payload. - - integration_interval_raw: integration interval, in block periods. - integration_interval(): integration interval, in seconds. block_period_raw: block period, in nanoseconds. block_period(): block period, in seconds. block_serial_number: timestamp of the data, in block periods since 1970. @@ -70,15 +63,26 @@ class StatisticsPacket(object): self.unpack() # Only parse valid statistics packets from SDP, reject everything else - if self.marker_raw not in b'SBX': + if self.marker_raw not in self.valid_markers(): raise ValueError( - "Invalid SDP statistics packet: packet marker (first byte) is {}, not one of 'SBX'.".format( - self.marker)) + f"Invalid packet of type {self.__class__.__name__}: packet marker (first byte) is {self.marker}, not one of {self.valid_markers()}.") # format string for the header, see unpack below - header_format = ">cBL HHB BHL BBH HQ" + header_format = ">cBL HH xxxxx xxxxxxx HQ" header_size = struct.calcsize(header_format) + @classmethod + def valid_markers(cls): + """ Valid values for the 'marker_raw' header field for this class. + + Each new packet class that introduces a new marker should be added + to the PACKET_CLASS_FOR_MARKER registry, which holds the mapping + marker -> packet class. + """ + + # return all markers registered in PACKET_CLASS_FOR_MARKER which are for this class or any of its specialisations + return [marker for marker, klass in PACKET_CLASS_FOR_MARKER.items() if issubclass(klass, cls)] + def unpack(self): """ Unpack the packet into properties of this object. """ @@ -89,25 +93,13 @@ class StatisticsPacket(object): self.observation_id, self.station_id, self.source_info, - # reserved byte - _, - # integration interval, in block periods. This field is 3 bytes, big endian -- combine later - integration_interval_hi, - integration_interval_lo, - self.data_id, - self.nof_signal_inputs, - self.nof_bytes_per_statistic, - self.nof_statistics_per_packet, self.block_period_raw, self.block_serial_number) = struct.unpack(self.header_format, self.packet[:self.header_size]) - - self.integration_interval_raw = (integration_interval_hi << 16) + integration_interval_lo except struct.error as e: raise ValueError("Error parsing statistics packet") from e # unpack the fields we just updated self.unpack_source_info() - self.unpack_data_id() def unpack_source_info(self): """ Unpack the source_info field into properties of this object. """ @@ -118,20 +110,19 @@ class StatisticsPacket(object): self.fsub_type = get_bit_value(self.source_info, 11) self.payload_error = (get_bit_value(self.source_info, 10) != 0) self.beam_repositioning_flag = (get_bit_value(self.source_info, 9) != 0) - self.subband_calibrated_flag = (get_bit_value(self.source_info, 8) != 0) - # self.source_info 5-7 are reserved + # self.source_info 5-8 are reserved self.gn_index = get_bit_value(self.source_info, 0, 4) - def unpack_data_id(self): - """ Unpack the data_id field into properties of this object. """ - - # only useful in specialisations (XST/SST/BST) - pass - def expected_size(self) -> int: """ The size this packet should be (header + payload), according to the header. """ - return self.header_size + self.nof_statistics_per_packet * self.nof_bytes_per_statistic + # the generic header does not contain enough information to determine the payload size + raise NotImplementedError + + def size(self) -> int: + """ The actual size of this packet. """ + + return len(self.packet) @property def marker(self) -> str: @@ -140,6 +131,7 @@ class StatisticsPacket(object): 'S' = SST 'B' = BST 'X' = XST + 'b' = beamlet """ try: @@ -151,12 +143,6 @@ class StatisticsPacket(object): # which the constructor will refuse to accept. return self.marker_raw - def integration_interval(self) -> float: - """ Returns the integration interval, in seconds. """ - - # Translate to seconds using the block period - return self.integration_interval_raw * self.block_period() - def block_period(self) -> float: """ Return the block period, in seconds. """ @@ -190,17 +176,8 @@ class StatisticsPacket(object): "fsub_type": self.fsub_type, "payload_error": self.payload_error, "beam_repositioning_flag": self.beam_repositioning_flag, - "subband_calibrated_flag": self.subband_calibrated_flag, "gn_index": self.gn_index, }, - "data_id": { - "_raw": self.data_id, - }, - "integration_interval_raw": self.integration_interval_raw, - "integration_interval": self.integration_interval(), - "nof_signal_inputs": self.nof_signal_inputs, - "nof_bytes_per_statistic": self.nof_bytes_per_statistic, - "nof_statistics_per_packet": self.nof_statistics_per_packet, "block_period_raw": self.block_period_raw, "block_period": self.block_period(), "block_serial_number": self.block_serial_number, @@ -221,6 +198,207 @@ class StatisticsPacket(object): struct.unpack(format_str, self.packet[self.header_size:self.header_size + struct.calcsize(format_str)])) +class BeamletPacket(SDPPacket): + """ + Models a beamlet UDP packet from SDP. + + Packets are expected to be UDP payload only (so no Ethernet/IP/UDP headers). + + The following additional fields are exposed as properties & functions. + + beamlet_width: bits/beamlet (4, 8, 16). + + beamlet_scale: scaling of beamlets prior to quantisation. + beamlet_index: index of the first beamlet in the payload. + nof_blocks_per_packet: number of blocks (timestamps) of data in the payload. + nof_beamlets_per_block: number of beamlets in the payload. + """ + # format string for the header, see unpack below + header_format = ">cBL HH xxxxx HHBHHQ" + header_size = struct.calcsize(header_format) + + def unpack(self): + """ Unpack the packet into properties of this object. """ + + # unpack fields + try: + (self.marker_raw, + self.version_id, + self.observation_id, + self.station_id, + self.source_info, + self.beamlet_scale, + self.beamlet_index, + self.nof_blocks_per_packet, + self.nof_beamlets_per_block, + self.block_period_raw, + self.block_serial_number) = struct.unpack(self.header_format, self.packet[:self.header_size]) + except struct.error as e: + raise ValueError("Error parsing beamlet packet") from e + + # unpack the fields we just updated + self.unpack_source_info() + + def unpack_source_info(self): + """ Unpack the source_info field into properties of this object. """ + + super().unpack_source_info() + + self.beamlet_width = get_bit_value(self.source_info, 5, 8) or 16 # 0 -> 16 + + def expected_size(self) -> int: + """ The size this packet should be (header + payload), according to the header. """ + + return self.header_size + self.nof_statistics_per_packet * self.nof_bytes_per_statistic + + @property + def nof_statistics_per_packet(self) -> int: + return self.nof_blocks_per_packet * self.nof_beamlets_per_block * N_POL * N_COMPLEX * self.beamlet_width // 8 + + @property + def nof_bytes_per_statistic(self) -> int: + if self.beamlet_width == 8: + # cint8 data [-127, 127] + return 1 + elif self.beamlet_width == 16: + # cint16 data [-32767, 32767] + return 2 + elif self.beamlet_width == 4: + # cint4 data [-7, 7], packet in 1 byte + return 1 + + def header(self) -> dict: + """ Return all the header fields as a dict. """ + + header = super().header() + + header["source_info"]["beamlet_width"] = self.beamlet_width + + header.update({ + "beamlet_scale": self.beamlet_scale, + "beamlet_index": self.beamlet_index, + "nof_blocks_per_packet": self.nof_blocks_per_packet, + "nof_beamlets_per_block": self.nof_beamlets_per_block + }) + + return header + + def payload_raw(self): + if self.beamlet_width == 4: + return super().payload(signed=True).reshape(self.nof_blocks_per_packet, self.nof_beamlets_per_block, N_POL) + else: + return super().payload(signed=True).reshape(self.nof_blocks_per_packet, self.nof_beamlets_per_block, N_POL, N_COMPLEX) + + @property + def payload(self): + if self.beamlet_width == 4: + # real in low 4 bits, imag in high 4 bits (both signed!) + payload_raw = self.payload_raw() + + # shift extends sign, so we prefer it over bitwise and + return (payload_raw << 4 >> 4) + (payload_raw >> 4) * 1j + else: + return self.payload_raw().astype(float).view(numpy.complex64) + + +class StatisticsPacket(SDPPacket): + """ + Models a statistics UDP packet from SDP. + + Packets are expected to be UDP payload only (so no Ethernet/IP/UDP headers). + + The following additional fields are exposed as properties & functions. + + subband_calibrated_flag: 1 = subband data had subband calibration values applied, 0 = not. + + data_id: bit field with payload information, encoding several other properties. + + nof_signal_inputs: number of inputs that contributed to data in this packet. + nof_bytes_per_statistics: word size of each statistic. + nof_statistics_per_packet: number of statistic data points in the payload. + + integration_interval_raw: integration interval, in block periods. + integration_interval(): integration interval, in seconds. + """ + + # statistics format string for the header, see unpack below + header_format = ">cBL HHB BHL BBH HQ" + header_size = struct.calcsize(header_format) + + def unpack(self): + """ Unpack the packet into properties of this object. """ + + # unpack fields + try: + (self.marker_raw, + self.version_id, + self.observation_id, + self.station_id, + self.source_info, + # reserved byte + _, + # integration interval, in block periods. This field is 3 bytes, big endian -- combine later + integration_interval_hi, + integration_interval_lo, + self.data_id, + self.nof_signal_inputs, + self.nof_bytes_per_statistic, + self.nof_statistics_per_packet, + self.block_period_raw, + self.block_serial_number) = struct.unpack(self.header_format, self.packet[:self.header_size]) + + self.integration_interval_raw = (integration_interval_hi << 16) + integration_interval_lo + except struct.error as e: + raise ValueError("Error parsing statistics packet") from e + + # unpack the fields we just updated + self.unpack_source_info() + self.unpack_data_id() + + def expected_size(self) -> int: + """ The size this packet should be (header + payload), according to the header. """ + + return self.header_size + self.nof_statistics_per_packet * self.nof_bytes_per_statistic + + def unpack_source_info(self): + """ Unpack the source_info field into properties of this object. """ + + super().unpack_source_info() + self.subband_calibrated_flag = (get_bit_value(self.source_info, 8) != 0) + + def unpack_data_id(self): + """ Unpack the data_id field into properties of this object. """ + + # only useful in specialisations (XST/SST/BST) + pass + + def integration_interval(self) -> float: + """ Returns the integration interval, in seconds. """ + + # Translate to seconds using the block period + return self.integration_interval_raw * self.block_period() + + def header(self) -> dict: + """ Return all the header fields as a dict. """ + + header = super().header() + + header["source_info"]["subband_calibrated_flag"] = self.subband_calibrated_flag + + header.update({ + "data_id": { + "_raw": self.data_id, + }, + "integration_interval_raw": self.integration_interval_raw, + "integration_interval": self.integration_interval(), + "nof_signal_inputs": self.nof_signal_inputs, + "nof_bytes_per_statistic": self.nof_bytes_per_statistic, + "nof_statistics_per_packet": self.nof_statistics_per_packet + }) + + return header + + class SSTPacket(StatisticsPacket): """ Models an SST statistics UDP packet from SDP. @@ -233,15 +411,6 @@ class SSTPacket(StatisticsPacket): payload[nof_statistics_per_packet]: SST statistics, an array of amplitudes per subband. """ - def __init__(self, packet): - super().__init__(packet) - - # We only parse SST packets - if self.marker != 'S': - raise Exception( - "Payload of SST requested of a non-SST packet. Actual packet marker is '{}', but must be 'S'.".format( - self.marker)) - def unpack_data_id(self): super().unpack_data_id() @@ -271,15 +440,6 @@ class XSTPacket(StatisticsPacket): payload[nof_signal_inputs][nof_signal_inputs] the baselines, starting from first_baseline """ - def __init__(self, packet): - super().__init__(packet) - - # We only parse XST packets - if self.marker != 'X': - raise Exception( - "Payload of XST requested of a non-XST packet. Actual packet marker is '{}', but must be 'X'.".format( - self.marker)) - def unpack_data_id(self): super().unpack_data_id() @@ -308,15 +468,6 @@ class BSTPacket(StatisticsPacket): beamlet_index: the number of the beamlet for which this packet holds statistics. """ - def __init__(self, packet): - super().__init__(packet) - - # We only parse BST packets - if self.marker != 'B': - raise Exception( - "Payload of BST requested of a non-BST packet. Actual packet marker is '{}', but must be 'B'.".format( - self.marker)) - def unpack_data_id(self): super().unpack_data_id() @@ -330,6 +481,58 @@ class BSTPacket(StatisticsPacket): return header +# Which class to use for which marker. +# +# NB: Python does not allow us to register from inside the class, +# as we cannot reference the class during its construction. +PACKET_CLASS_FOR_MARKER = { + b'b': BeamletPacket, + b'S': SSTPacket, + b'B': BSTPacket, + b'X': XSTPacket, +} + + +def read_packet(read_func) -> SDPPacket: + """ Read a packet using the given read function, with signature + + read_func(num_bytes: int) -> bytes + + and return it. The packet type is sensed from the data and + the correct subclass of SDPPacket is returned. + + If read_func() returns None, this function will as well. + """ + + # read just the marker + marker = read_func(1) + if not marker: + return None + + # read the packet header based on type + packetClass = PACKET_CLASS_FOR_MARKER[marker] + + # read the rest of the header + header = read_func(packetClass.header_size - len(marker)) + if not header: + return None + + header = marker + header + + # parse the packet header size + packet = packetClass(header) + + # read the payload + payload_size = packet.expected_size() - len(header) + payload = read_func(payload_size) + + if not payload: + return None + + # return full packet + return packetClass(header + payload) + + def main(args=None, **kwargs): # parse one packet from stdin import sys @@ -342,28 +545,22 @@ def main(args=None, **kwargs): offset = 0 while True: - # read just the header - header = sys.stdin.buffer.read(StatisticsPacket.header_size) - if not header: - break + # read the packet from input + packet = read_packet(sys.stdin.buffer.read) - # read the payload - packet = StatisticsPacket(header) - payload_size = packet.expected_size() - len(header) - payload = sys.stdin.buffer.read(payload_size) - - # construct the packet based on type - if packet.marker == 'S': - packet = SSTPacket(header + payload) - elif packet.marker == 'X': - packet = XSTPacket(header + payload) - elif packet.marker == 'B': - packet = BSTPacket(header + payload) + if not packet: + break # print header - print(f"# Packet {nr} starting at offset {offset}") + print(f"# Packet {nr} of class {packet.__class__.__name__} starting at offset {offset} with length {packet.size()}") pprint.pprint(packet.header()) # increment counters nr += 1 - offset += len(header) + len(payload) + offset += packet.size() + + +# this file is very useful to have stand alone to parse raw packet files, so make it work as such +if __name__ == "__main__": + import sys + main(sys.argv) diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/statistics_collector.py b/tangostationcontrol/tangostationcontrol/devices/sdp/statistics_collector.py index 960fc49ab9d3fa3f9e3c2ad75429be901e420fbb..d8ac93d22dbee89bab1b48b86d5e011f4b4ae265 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/statistics_collector.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/statistics_collector.py @@ -4,7 +4,7 @@ import logging import numpy import datetime -from .statistics_packet import SSTPacket, XSTPacket +from .packet import SSTPacket, XSTPacket from tangostationcontrol.common.baselines import nr_baselines, baseline_index, baseline_from_index from tangostationcontrol.clients.statistics_client_thread import StatisticsClientThread diff --git a/tangostationcontrol/tangostationcontrol/statistics_writer/hdf5_writer.py b/tangostationcontrol/tangostationcontrol/statistics_writer/hdf5_writer.py index 9e65f4e51d7fe4b8fbdfc4be52b754ffef9f4bdd..7878fb409978845d94500bcb7d80f9e390aefbb3 100644 --- a/tangostationcontrol/tangostationcontrol/statistics_writer/hdf5_writer.py +++ b/tangostationcontrol/tangostationcontrol/statistics_writer/hdf5_writer.py @@ -12,7 +12,7 @@ from abc import ABC, abstractmethod # import statistics classes with workaround import sys sys.path.append("..") -from tangostationcontrol.devices.sdp.statistics_packet import SSTPacket, XSTPacket +from tangostationcontrol.devices.sdp.packet import SSTPacket, XSTPacket import tangostationcontrol.devices.sdp.statistics_collector as statistics_collector diff --git a/tangostationcontrol/tangostationcontrol/statistics_writer/receiver.py b/tangostationcontrol/tangostationcontrol/statistics_writer/receiver.py index cd6c0af4cd5d4a1f8cdc3c2e37d86f6bd655db53..c481d085e00170cfa4771c4af0c79dcc675460b5 100644 --- a/tangostationcontrol/tangostationcontrol/statistics_writer/receiver.py +++ b/tangostationcontrol/tangostationcontrol/statistics_writer/receiver.py @@ -2,7 +2,7 @@ import socket import sys sys.path.append("..") -from tangostationcontrol.devices.sdp.statistics_packet import StatisticsPacket +from tangostationcontrol.devices.sdp.packet import StatisticsPacket import os class receiver: diff --git a/tangostationcontrol/tangostationcontrol/statistics_writer/udp_dev/udp_write_manager.py b/tangostationcontrol/tangostationcontrol/statistics_writer/udp_dev/udp_write_manager.py index 657607d688425ea862aeaf8c08238639acc98873..d8e234225237bac1796f11afe80045c3e09b15d8 100644 --- a/tangostationcontrol/tangostationcontrol/statistics_writer/udp_dev/udp_write_manager.py +++ b/tangostationcontrol/tangostationcontrol/statistics_writer/udp_dev/udp_write_manager.py @@ -5,7 +5,7 @@ import h5py import numpy as np from statistics_writer.udp_dev import udp_server as udp import netifaces as ni -from statistics_packet import SSTPacket +from packet import SSTPacket __all__ = ["statistics_writer"] diff --git a/tangostationcontrol/tangostationcontrol/test/devices/test_statistics_collector.py b/tangostationcontrol/tangostationcontrol/test/devices/test_statistics_collector.py index 7113ee837631789e99218f3356a602637ccac116..4ddeab05e63baa29c791a61ec428effa8e2f47a2 100644 --- a/tangostationcontrol/tangostationcontrol/test/devices/test_statistics_collector.py +++ b/tangostationcontrol/tangostationcontrol/test/devices/test_statistics_collector.py @@ -1,5 +1,5 @@ from tangostationcontrol.devices.sdp.statistics_collector import XSTCollector -from tangostationcontrol.devices.sdp.statistics_packet import XSTPacket +from tangostationcontrol.devices.sdp.packet import XSTPacket from tangostationcontrol.test import base @@ -42,6 +42,7 @@ class TestSelectSubbandSlot(base.TestCase): # check where a new subband replaces the oldest self.assertEqual(XSTCollector.MAX_PARALLEL_SUBBANDS - 1, collector.select_subband_slot(200)) + class TestXSTCollector(base.TestCase): def test_valid_packet(self): collector = XSTCollector() @@ -208,4 +209,3 @@ class TestXSTCollector(base.TestCase): self.assertEqual(0, collector.parameters["nof_valid_payloads"][fpga_index]) self.assertEqual(1, collector.parameters["nof_payload_errors"][fpga_index]) -