Skip to content
Snippets Groups Projects
Commit 4c3e7a36 authored by Jan David Mol's avatar Jan David Mol
Browse files

Merge branch 'L2SS-676-write-beamlet-packet-parser' into 'master'

L2SS-676: Support parsing beamlet packets

Closes L2SS-676

See merge request !266
parents 397b37d6 a8565560
Branches
Tags
1 merge request!266L2SS-676: Support parsing beamlet packets
......@@ -53,7 +53,7 @@ console_scripts =
l2ss-cold-start = tangostationcontrol.toolkit.lts_cold_start:main
l2ss-hardware-device-template = tangostationcontrol.examples.HW_device_template:main
l2ss-ini-device = tangostationcontrol.examples.load_from_disk.ini_device:main
l2ss-parse-statistics-packet = tangostationcontrol.devices.sdp.statistics_packet:main
l2ss-parse-sdp-packet = tangostationcontrol.devices.sdp.packet:main
l2ss-random-data = tangostationcontrol.test.devices.random_data:main
l2ss-snmp = tangostationcontrol.examples.snmp.snmp:main
l2ss-version = tangostationcontrol.common.lofar_version:main
......
......@@ -2,8 +2,10 @@ import struct
from datetime import datetime, timezone
import numpy
__all__ = ["StatisticsPacket", "SSTPacket", "XSTPacket", "BSTPacket"]
__all__ = ["StatisticsPacket", "SDPPacket", "SSTPacket", "XSTPacket", "BSTPacket", "read_packet", "PACKET_CLASS_FOR_MARKER"]
N_POL = 2
N_COMPLEX = 2
def get_bit_value(value: bytes, first_bit: int, last_bit: int = None) -> int:
""" Return bits [first_bit:last_bit] from value, and return their integer value. Bit 0 = LSB.
......@@ -20,9 +22,9 @@ def get_bit_value(value: bytes, first_bit: int, last_bit: int = None) -> int:
return value >> first_bit & ((1 << (last_bit - first_bit + 1)) - 1)
class StatisticsPacket(object):
class SDPPacket(object):
"""
Models a statistics UDP packet from SDP.
Models a UDP packet from SDP.
Packets are expected to be UDP payload only (so no Ethernet/IP/UDP headers).
......@@ -46,17 +48,8 @@ class StatisticsPacket(object):
fsub_type: sampling method. 0 = critically sampled, 1 = oversampled.
payload_error: 0 = data is ok, 1 = data is corrupted (a fault was encountered).
beam_repositioning_flag: 0 = data is ok, 1 = beam got repositioned during packet construction (BST only).
subband_calibrated_flag: 1 = subband data had subband calibration values applied, 0 = not.
gn_index: global index of FPGA that emitted this packet.
data_id: bit field with payload information, encoding several other properties.
nof_signal_inputs: number of inputs that contributed to data in this packet.
nof_bytes_per_statistics: word size of each statistic.
nof_statistics_per_packet: number of statistic data points in the payload.
integration_interval_raw: integration interval, in block periods.
integration_interval(): integration interval, in seconds.
block_period_raw: block period, in nanoseconds.
block_period(): block period, in seconds.
block_serial_number: timestamp of the data, in block periods since 1970.
......@@ -70,15 +63,26 @@ class StatisticsPacket(object):
self.unpack()
# Only parse valid statistics packets from SDP, reject everything else
if self.marker_raw not in b'SBX':
if self.marker_raw not in self.valid_markers():
raise ValueError(
"Invalid SDP statistics packet: packet marker (first byte) is {}, not one of 'SBX'.".format(
self.marker))
f"Invalid packet of type {self.__class__.__name__}: packet marker (first byte) is {self.marker}, not one of {self.valid_markers()}.")
# format string for the header, see unpack below
header_format = ">cBL HHB BHL BBH HQ"
header_format = ">cBL HH xxxxx xxxxxxx HQ"
header_size = struct.calcsize(header_format)
@classmethod
def valid_markers(cls):
""" Valid values for the 'marker_raw' header field for this class.
Each new packet class that introduces a new marker should be added
to the PACKET_CLASS_FOR_MARKER registry, which holds the mapping
marker -> packet class.
"""
# return all markers registered in PACKET_CLASS_FOR_MARKER which are for this class or any of its specialisations
return [marker for marker, klass in PACKET_CLASS_FOR_MARKER.items() if issubclass(klass, cls)]
def unpack(self):
""" Unpack the packet into properties of this object. """
......@@ -89,25 +93,13 @@ class StatisticsPacket(object):
self.observation_id,
self.station_id,
self.source_info,
# reserved byte
_,
# integration interval, in block periods. This field is 3 bytes, big endian -- combine later
integration_interval_hi,
integration_interval_lo,
self.data_id,
self.nof_signal_inputs,
self.nof_bytes_per_statistic,
self.nof_statistics_per_packet,
self.block_period_raw,
self.block_serial_number) = struct.unpack(self.header_format, self.packet[:self.header_size])
self.integration_interval_raw = (integration_interval_hi << 16) + integration_interval_lo
except struct.error as e:
raise ValueError("Error parsing statistics packet") from e
# unpack the fields we just updated
self.unpack_source_info()
self.unpack_data_id()
def unpack_source_info(self):
""" Unpack the source_info field into properties of this object. """
......@@ -118,20 +110,19 @@ class StatisticsPacket(object):
self.fsub_type = get_bit_value(self.source_info, 11)
self.payload_error = (get_bit_value(self.source_info, 10) != 0)
self.beam_repositioning_flag = (get_bit_value(self.source_info, 9) != 0)
self.subband_calibrated_flag = (get_bit_value(self.source_info, 8) != 0)
# self.source_info 5-7 are reserved
# self.source_info 5-8 are reserved
self.gn_index = get_bit_value(self.source_info, 0, 4)
def unpack_data_id(self):
""" Unpack the data_id field into properties of this object. """
# only useful in specialisations (XST/SST/BST)
pass
def expected_size(self) -> int:
""" The size this packet should be (header + payload), according to the header. """
return self.header_size + self.nof_statistics_per_packet * self.nof_bytes_per_statistic
# the generic header does not contain enough information to determine the payload size
raise NotImplementedError
def size(self) -> int:
""" The actual size of this packet. """
return len(self.packet)
@property
def marker(self) -> str:
......@@ -140,6 +131,7 @@ class StatisticsPacket(object):
'S' = SST
'B' = BST
'X' = XST
'b' = beamlet
"""
try:
......@@ -151,12 +143,6 @@ class StatisticsPacket(object):
# which the constructor will refuse to accept.
return self.marker_raw
def integration_interval(self) -> float:
""" Returns the integration interval, in seconds. """
# Translate to seconds using the block period
return self.integration_interval_raw * self.block_period()
def block_period(self) -> float:
""" Return the block period, in seconds. """
......@@ -190,17 +176,8 @@ class StatisticsPacket(object):
"fsub_type": self.fsub_type,
"payload_error": self.payload_error,
"beam_repositioning_flag": self.beam_repositioning_flag,
"subband_calibrated_flag": self.subband_calibrated_flag,
"gn_index": self.gn_index,
},
"data_id": {
"_raw": self.data_id,
},
"integration_interval_raw": self.integration_interval_raw,
"integration_interval": self.integration_interval(),
"nof_signal_inputs": self.nof_signal_inputs,
"nof_bytes_per_statistic": self.nof_bytes_per_statistic,
"nof_statistics_per_packet": self.nof_statistics_per_packet,
"block_period_raw": self.block_period_raw,
"block_period": self.block_period(),
"block_serial_number": self.block_serial_number,
......@@ -221,6 +198,207 @@ class StatisticsPacket(object):
struct.unpack(format_str, self.packet[self.header_size:self.header_size + struct.calcsize(format_str)]))
class BeamletPacket(SDPPacket):
"""
Models a beamlet UDP packet from SDP.
Packets are expected to be UDP payload only (so no Ethernet/IP/UDP headers).
The following additional fields are exposed as properties & functions.
beamlet_width: bits/beamlet (4, 8, 16).
beamlet_scale: scaling of beamlets prior to quantisation.
beamlet_index: index of the first beamlet in the payload.
nof_blocks_per_packet: number of blocks (timestamps) of data in the payload.
nof_beamlets_per_block: number of beamlets in the payload.
"""
# format string for the header, see unpack below
header_format = ">cBL HH xxxxx HHBHHQ"
header_size = struct.calcsize(header_format)
def unpack(self):
""" Unpack the packet into properties of this object. """
# unpack fields
try:
(self.marker_raw,
self.version_id,
self.observation_id,
self.station_id,
self.source_info,
self.beamlet_scale,
self.beamlet_index,
self.nof_blocks_per_packet,
self.nof_beamlets_per_block,
self.block_period_raw,
self.block_serial_number) = struct.unpack(self.header_format, self.packet[:self.header_size])
except struct.error as e:
raise ValueError("Error parsing beamlet packet") from e
# unpack the fields we just updated
self.unpack_source_info()
def unpack_source_info(self):
""" Unpack the source_info field into properties of this object. """
super().unpack_source_info()
self.beamlet_width = get_bit_value(self.source_info, 5, 8) or 16 # 0 -> 16
def expected_size(self) -> int:
""" The size this packet should be (header + payload), according to the header. """
return self.header_size + self.nof_statistics_per_packet * self.nof_bytes_per_statistic
@property
def nof_statistics_per_packet(self) -> int:
return self.nof_blocks_per_packet * self.nof_beamlets_per_block * N_POL * N_COMPLEX * self.beamlet_width // 8
@property
def nof_bytes_per_statistic(self) -> int:
if self.beamlet_width == 8:
# cint8 data [-127, 127]
return 1
elif self.beamlet_width == 16:
# cint16 data [-32767, 32767]
return 2
elif self.beamlet_width == 4:
# cint4 data [-7, 7], packet in 1 byte
return 1
def header(self) -> dict:
""" Return all the header fields as a dict. """
header = super().header()
header["source_info"]["beamlet_width"] = self.beamlet_width
header.update({
"beamlet_scale": self.beamlet_scale,
"beamlet_index": self.beamlet_index,
"nof_blocks_per_packet": self.nof_blocks_per_packet,
"nof_beamlets_per_block": self.nof_beamlets_per_block
})
return header
def payload_raw(self):
if self.beamlet_width == 4:
return super().payload(signed=True).reshape(self.nof_blocks_per_packet, self.nof_beamlets_per_block, N_POL)
else:
return super().payload(signed=True).reshape(self.nof_blocks_per_packet, self.nof_beamlets_per_block, N_POL, N_COMPLEX)
@property
def payload(self):
if self.beamlet_width == 4:
# real in low 4 bits, imag in high 4 bits (both signed!)
payload_raw = self.payload_raw()
# shift extends sign, so we prefer it over bitwise and
return (payload_raw << 4 >> 4) + (payload_raw >> 4) * 1j
else:
return self.payload_raw().astype(float).view(numpy.complex64)
class StatisticsPacket(SDPPacket):
"""
Models a statistics UDP packet from SDP.
Packets are expected to be UDP payload only (so no Ethernet/IP/UDP headers).
The following additional fields are exposed as properties & functions.
subband_calibrated_flag: 1 = subband data had subband calibration values applied, 0 = not.
data_id: bit field with payload information, encoding several other properties.
nof_signal_inputs: number of inputs that contributed to data in this packet.
nof_bytes_per_statistics: word size of each statistic.
nof_statistics_per_packet: number of statistic data points in the payload.
integration_interval_raw: integration interval, in block periods.
integration_interval(): integration interval, in seconds.
"""
# statistics format string for the header, see unpack below
header_format = ">cBL HHB BHL BBH HQ"
header_size = struct.calcsize(header_format)
def unpack(self):
""" Unpack the packet into properties of this object. """
# unpack fields
try:
(self.marker_raw,
self.version_id,
self.observation_id,
self.station_id,
self.source_info,
# reserved byte
_,
# integration interval, in block periods. This field is 3 bytes, big endian -- combine later
integration_interval_hi,
integration_interval_lo,
self.data_id,
self.nof_signal_inputs,
self.nof_bytes_per_statistic,
self.nof_statistics_per_packet,
self.block_period_raw,
self.block_serial_number) = struct.unpack(self.header_format, self.packet[:self.header_size])
self.integration_interval_raw = (integration_interval_hi << 16) + integration_interval_lo
except struct.error as e:
raise ValueError("Error parsing statistics packet") from e
# unpack the fields we just updated
self.unpack_source_info()
self.unpack_data_id()
def expected_size(self) -> int:
""" The size this packet should be (header + payload), according to the header. """
return self.header_size + self.nof_statistics_per_packet * self.nof_bytes_per_statistic
def unpack_source_info(self):
""" Unpack the source_info field into properties of this object. """
super().unpack_source_info()
self.subband_calibrated_flag = (get_bit_value(self.source_info, 8) != 0)
def unpack_data_id(self):
""" Unpack the data_id field into properties of this object. """
# only useful in specialisations (XST/SST/BST)
pass
def integration_interval(self) -> float:
""" Returns the integration interval, in seconds. """
# Translate to seconds using the block period
return self.integration_interval_raw * self.block_period()
def header(self) -> dict:
""" Return all the header fields as a dict. """
header = super().header()
header["source_info"]["subband_calibrated_flag"] = self.subband_calibrated_flag
header.update({
"data_id": {
"_raw": self.data_id,
},
"integration_interval_raw": self.integration_interval_raw,
"integration_interval": self.integration_interval(),
"nof_signal_inputs": self.nof_signal_inputs,
"nof_bytes_per_statistic": self.nof_bytes_per_statistic,
"nof_statistics_per_packet": self.nof_statistics_per_packet
})
return header
class SSTPacket(StatisticsPacket):
"""
Models an SST statistics UDP packet from SDP.
......@@ -233,15 +411,6 @@ class SSTPacket(StatisticsPacket):
payload[nof_statistics_per_packet]: SST statistics, an array of amplitudes per subband.
"""
def __init__(self, packet):
super().__init__(packet)
# We only parse SST packets
if self.marker != 'S':
raise Exception(
"Payload of SST requested of a non-SST packet. Actual packet marker is '{}', but must be 'S'.".format(
self.marker))
def unpack_data_id(self):
super().unpack_data_id()
......@@ -271,15 +440,6 @@ class XSTPacket(StatisticsPacket):
payload[nof_signal_inputs][nof_signal_inputs] the baselines, starting from first_baseline
"""
def __init__(self, packet):
super().__init__(packet)
# We only parse XST packets
if self.marker != 'X':
raise Exception(
"Payload of XST requested of a non-XST packet. Actual packet marker is '{}', but must be 'X'.".format(
self.marker))
def unpack_data_id(self):
super().unpack_data_id()
......@@ -308,15 +468,6 @@ class BSTPacket(StatisticsPacket):
beamlet_index: the number of the beamlet for which this packet holds statistics.
"""
def __init__(self, packet):
super().__init__(packet)
# We only parse BST packets
if self.marker != 'B':
raise Exception(
"Payload of BST requested of a non-BST packet. Actual packet marker is '{}', but must be 'B'.".format(
self.marker))
def unpack_data_id(self):
super().unpack_data_id()
......@@ -330,6 +481,58 @@ class BSTPacket(StatisticsPacket):
return header
# Which class to use for which marker.
#
# NB: Python does not allow us to register from inside the class,
# as we cannot reference the class during its construction.
PACKET_CLASS_FOR_MARKER = {
b'b': BeamletPacket,
b'S': SSTPacket,
b'B': BSTPacket,
b'X': XSTPacket,
}
def read_packet(read_func) -> SDPPacket:
""" Read a packet using the given read function, with signature
read_func(num_bytes: int) -> bytes
and return it. The packet type is sensed from the data and
the correct subclass of SDPPacket is returned.
If read_func() returns None, this function will as well.
"""
# read just the marker
marker = read_func(1)
if not marker:
return None
# read the packet header based on type
packetClass = PACKET_CLASS_FOR_MARKER[marker]
# read the rest of the header
header = read_func(packetClass.header_size - len(marker))
if not header:
return None
header = marker + header
# parse the packet header size
packet = packetClass(header)
# read the payload
payload_size = packet.expected_size() - len(header)
payload = read_func(payload_size)
if not payload:
return None
# return full packet
return packetClass(header + payload)
def main(args=None, **kwargs):
# parse one packet from stdin
import sys
......@@ -342,28 +545,22 @@ def main(args=None, **kwargs):
offset = 0
while True:
# read just the header
header = sys.stdin.buffer.read(StatisticsPacket.header_size)
if not header:
break
# read the packet from input
packet = read_packet(sys.stdin.buffer.read)
# read the payload
packet = StatisticsPacket(header)
payload_size = packet.expected_size() - len(header)
payload = sys.stdin.buffer.read(payload_size)
# construct the packet based on type
if packet.marker == 'S':
packet = SSTPacket(header + payload)
elif packet.marker == 'X':
packet = XSTPacket(header + payload)
elif packet.marker == 'B':
packet = BSTPacket(header + payload)
if not packet:
break
# print header
print(f"# Packet {nr} starting at offset {offset}")
print(f"# Packet {nr} of class {packet.__class__.__name__} starting at offset {offset} with length {packet.size()}")
pprint.pprint(packet.header())
# increment counters
nr += 1
offset += len(header) + len(payload)
offset += packet.size()
# this file is very useful to have stand alone to parse raw packet files, so make it work as such
if __name__ == "__main__":
import sys
main(sys.argv)
......@@ -4,7 +4,7 @@ import logging
import numpy
import datetime
from .statistics_packet import SSTPacket, XSTPacket
from .packet import SSTPacket, XSTPacket
from tangostationcontrol.common.baselines import nr_baselines, baseline_index, baseline_from_index
from tangostationcontrol.clients.statistics_client_thread import StatisticsClientThread
......
......@@ -12,7 +12,7 @@ from abc import ABC, abstractmethod
# import statistics classes with workaround
import sys
sys.path.append("..")
from tangostationcontrol.devices.sdp.statistics_packet import SSTPacket, XSTPacket
from tangostationcontrol.devices.sdp.packet import SSTPacket, XSTPacket
import tangostationcontrol.devices.sdp.statistics_collector as statistics_collector
......
......@@ -2,7 +2,7 @@ import socket
import sys
sys.path.append("..")
from tangostationcontrol.devices.sdp.statistics_packet import StatisticsPacket
from tangostationcontrol.devices.sdp.packet import StatisticsPacket
import os
class receiver:
......
......@@ -5,7 +5,7 @@ import h5py
import numpy as np
from statistics_writer.udp_dev import udp_server as udp
import netifaces as ni
from statistics_packet import SSTPacket
from packet import SSTPacket
__all__ = ["statistics_writer"]
......
from tangostationcontrol.devices.sdp.statistics_collector import XSTCollector
from tangostationcontrol.devices.sdp.statistics_packet import XSTPacket
from tangostationcontrol.devices.sdp.packet import XSTPacket
from tangostationcontrol.test import base
......@@ -42,6 +42,7 @@ class TestSelectSubbandSlot(base.TestCase):
# check where a new subband replaces the oldest
self.assertEqual(XSTCollector.MAX_PARALLEL_SUBBANDS - 1, collector.select_subband_slot(200))
class TestXSTCollector(base.TestCase):
def test_valid_packet(self):
collector = XSTCollector()
......@@ -208,4 +209,3 @@ class TestXSTCollector(base.TestCase):
self.assertEqual(0, collector.parameters["nof_valid_payloads"][fpga_index])
self.assertEqual(1, collector.parameters["nof_payload_errors"][fpga_index])
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment