Skip to content
Snippets Groups Projects
Commit 8c34f250 authored by Jan David Mol's avatar Jan David Mol
Browse files

L2SS-244: Rewrite of StatisticsPacket class: unpack everything in 1 go for...

L2SS-244: Rewrite of StatisticsPacket class: unpack everything in 1 go for efficiency, and split off specialistic SST/XST/BST classes
parent 2f85e1aa
Branches
Tags
1 merge request!56L2SS-244: Expose the SSTs in MPs
from struct import unpack, calcsize import struct
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Tuple from typing import Tuple
import numpy import numpy
__all__ = ["StatisticsPacket"] __all__ = ["StatisticsPacket", "SSTPacket", "XSTPacket", "BSTPacket"]
def get_bit_value(value: bytes, first_bit: int, last_bit:int=None) -> int: def get_bit_value(value: bytes, first_bit: int, last_bit:int=None) -> int:
""" Return bits [first_bit:last_bit] from value, and return their integer value. Bit 0 = LSB. """ Return bits [first_bit:last_bit] from value, and return their integer value. Bit 0 = LSB.
...@@ -24,19 +24,111 @@ class StatisticsPacket(object): ...@@ -24,19 +24,111 @@ class StatisticsPacket(object):
Models a statistics UDP packet from SDP. Models a statistics UDP packet from SDP.
Packets are expected to be UDP payload only (so no Ethernet/IP/UDP headers). Packets are expected to be UDP payload only (so no Ethernet/IP/UDP headers).
The following fields are exposed as properties & functions. The _raw fields come directly
from the packet, and have more user-friendly alternatives for intepretation:
marker_raw packet marker as byte.
marker() packet marker as character. 'S' = SST, 'X' = XST, 'B' = BST
version_id packet format version.
observation_id observation identifier.
station_id station identifier.
source_info: bit field with input information, encoding several other properties.
antenna_band_index: antenna type. 0 = low band, 1 = high band.
nyquist_zone_index: nyquist zone of filter:
0 = 0 -- 1/2 * t_adc Hz (low band),
1 = 1/2 * t_adc -- t_adc Hz (high band),
2 = t_adc -- 3/2 * t_adc Hz (high band).
t_adc: sampling clock. 0 = 160 MHz, 1 = 200 MHz.
fsub_type: sampling method. 0 = critically sampled, 1 = oversampled.
payload_error: 0 = data is ok, 1 = data is corrupted (a fault was encountered).
beam_repositioning_flag: 0 = data is ok, 1 = beam got repositioned during packet construction (BST only).
subband_calibrated_flag: 1 = subband data had subband calibration values applied, 0 = not.
gn_index: global index of FPGA that emitted this packet.
data_id: bit field with payload information, encoding several other properties.
nof_signal_inputs: number of inputs that contributed to data in this packet.
nof_bytes_per_statistics: word size of each statistic.
nof_statistics_per_packet: number of statistic data points in the payload.
integration_interval_raw: integration interval, in block periods.
integration_interval(): integration interval, in seconds.
block_period_raw: block period, in nanoseconds.
block_period(): block period, in seconds.
block_serial_number: timestamp of the data, in block periods since 1970.
timestamp(): timestamp of the data, as a datetime object.
""" """
def __init__(self, packet: bytes): def __init__(self, packet: bytes):
self.packet = packet self.packet = packet
self.unpack()
# Only parse valid statistics packets from SDP, reject everything else # Only parse valid statistics packets from SDP, reject everything else
if type(self.marker) == bytes or self.marker not in 'SBX': if self.marker_raw not in b'SBX':
raise ValueError("Invalid SDP statistics packet: packet marker (first byte) is '{}', not one of 'SBX'.".format(self.marker)) raise ValueError("Invalid SDP statistics packet: packet marker (first byte) is {}, not one of 'SBX'.".format(self.marker))
def unpack(self):
""" Unpack the packet into properties of this object. """
# format string for the header, see unpack below
self.header_format = ">cBL HHB BHL BBH HQ"
self.header_size = struct.calcsize(self.header_format)
# unpack fields
try:
(self.marker_raw,
self.version_id,
self.observation_id,
self.station_id,
self.source_info,
# reserved byte
_,
# integration interval, in block periods. This field is 3 bytes, big endian -- combine later
integration_interval_hi,
integration_interval_lo,
self.data_id,
self.nof_signal_inputs,
self.nof_bytes_per_statistic,
self.nof_statistics_per_packet,
self.block_period_raw,
self.block_serial_number) = struct.unpack(self.header_format, self.packet[:self.header_size])
self.integration_interval_raw = (integration_interval_hi << 16) + integration_interval_lo
except struct.error as e:
raise ValueError("Error parsing statistics packet") from e
# unpack the fields we just updated
self.unpack_source_info()
self.unpack_data_id()
def unpack_source_info(self):
""" Unpack the source_info field into properties of this object. """
self.antenna_band_index = get_bit_value(self.source_info, 15)
self.nyquist_zone_index = get_bit_value(self.source_info, 13, 14)
self.t_adc = get_bit_value(self.source_info, 12)
self.fsub_type = get_bit_value(self.source_info, 11)
self.payload_error = get_bit_value(self.source_info, 10)
self.beam_repositioning_flag = get_bit_value(self.source_info, 9)
self.subband_calibrated_flag = get_bit_value(self.source_info, 8)
# self.source_info 5-7 are reserved
self.gn_index = get_bit_value(self.source_info, 0, 4)
def unpack_data_id(self):
""" Unpack the data_id field into properties of this object. """
# only useful in specialisations (XST/SST/BST)
pass
def expected_size(self) -> int: def expected_size(self) -> int:
""" The size this packet should be (header + payload), according to the header. """ """ The size this packet should be (header + payload), according to the header. """
return 32 + self.nof_statistics_per_packet * self.nof_bytes_per_statistic return self.header_size + self.nof_statistics_per_packet * self.nof_bytes_per_statistic
@property @property
def marker(self) -> str: def marker(self) -> str:
...@@ -47,84 +139,14 @@ class StatisticsPacket(object): ...@@ -47,84 +139,14 @@ class StatisticsPacket(object):
'X' = XST 'X' = XST
""" """
raw_marker = unpack("c",self.packet[0:1])[0]
try: try:
return raw_marker.decode('ascii') return self.marker_raw.decode('ascii')
except UnicodeDecodeError: except UnicodeDecodeError:
# non-ascii (>127) character, return as binary # non-ascii (>127) character, return as binary
# #
# this is typically not visible to the user, as these packets are not SDP statistics packets, # this is typically not visible to the user, as these packets are not SDP statistics packets,
# which the constructor will refuse to accept. # which the constructor will refuse to accept.
return raw_marker return self.marker_raw
@property
def version_id(self) -> int:
""" Return the version of this packet. """
return unpack("B",self.packet[1:2])[0]
@property
def observation_id(self) -> int:
""" Return the ID of the observation running when this packet was generated. """
return unpack(">I",self.packet[2:6])[0]
@property
def station_id(self) -> int:
""" Return the number of the station this packet was generated on. """
return unpack(">H",self.packet[6:8])[0]
@property
def source_info(self) -> int:
""" Return a dict with the source_info flags. The dict contains the following fields:
_raw: raw value of the source_info field in the packet, as an integer.
antenna_band_index: antenna type. 0 = low band, 1 = high band.
nyquist_zone_index: nyquist zone of filter:
0 = 0 -- 1/2 * t_adc Hz (low band),
1 = 1/2 * t_adc -- t_adc Hz (high band),
2 = t_adc -- 3/2 * t_adc Hz (high band).
t_adc: sampling clock. 0 = 160 MHz, 1 = 200 MHz.
fsub_type: sampling method. 0 = critically sampled, 1 = oversampled.
payload_error: 0 = data is ok, 1 = data is corrupted (a fault was encountered).
beam_repositioning_flag: 0 = data is ok, 1 = beam got repositioned during packet construction (BST only).
subband_calibrated_flag: 1 = subband data had subband calibration values applied, 0 = not
reserved: reserved bits
gn_index: global index of FPGA that emitted this packet. """
bits = unpack(">H",self.packet[8:10])[0]
return {
"_raw": bits,
"antenna_band_index": get_bit_value(bits, 15),
"nyquist_zone_index": get_bit_value(bits, 13, 14),
"t_adc": get_bit_value(bits, 12),
"fsub_type": get_bit_value(bits, 11),
"payload_error": get_bit_value(bits, 10),
"beam_repositioning_flag": get_bit_value(bits, 9),
"subband_calibrated_flag": get_bit_value(bits, 8),
"reserved": get_bit_value(bits, 5, 7),
"gn_index": get_bit_value(bits, 0, 4),
}
@property
def payload_error(self) -> bool:
return self.source_info["payload_error"] == 1
@property
def reserved(self) -> bytes:
""" Reserved bytes. """
return self.packet[10:11]
@property
def integration_interval_raw(self) -> int:
""" Returns the integration interval, in blocks. """
# This field is 3 bytes, big endian, so we need to prepend a 0 to parse it as a 32-bit integer.
return unpack(">I", b'0' + self.packet[11:14])[0]
def integration_interval(self) -> float: def integration_interval(self) -> float:
""" Returns the integration interval, in seconds. """ """ Returns the integration interval, in seconds. """
...@@ -132,85 +154,11 @@ class StatisticsPacket(object): ...@@ -132,85 +154,11 @@ class StatisticsPacket(object):
# Translate to seconds using the block period # Translate to seconds using the block period
return self.integration_interval_raw * self.block_period() return self.integration_interval_raw * self.block_period()
@property
def data_id_raw(self) -> int:
""" Returns the generic data identifier. """
return unpack(">I",self.packet[14:18])[0]
@property
def sst_signal_input_index(self) -> int:
""" SST: Returns the input (antenna) number for which this packet holds statistics. """
if self.marker != 'S':
raise Exception("Property of SST requested of a non-SST packet. Actual packet marker is '{}', but must be 'S'.".format(self.marker))
return get_bit_value(self.data_id_raw, 0, 7)
@property
def bst_beamlet_index(self) -> int:
""" BST: Returns the number of the beamlet for which this packet holds statistics. """
if self.marker != 'B':
raise Exception("Property of BST requested of a non-BST packet. Actual packet marker is '{}', but must be 'B'.".format(self.marker))
return get_bit_value(self.data_id_raw, 0, 15)
@property
def xst_subband_index(self) -> int:
""" XST: Returns the number of the subband for which this packet holds statistics. """
if self.marker != 'X':
raise Exception("Property of XST requested of a non-XST packet. Actual packet marker is '{}', but must be 'X'.".format(self.marker))
return get_bit_value(self.data_id_raw, 16, 24)
@property
def xst_baseline(self) -> Tuple[int, int]:
""" XST: Returns the pair of inputs (antennae) for which this packet holds statistics. """
if self.marker != 'X':
raise Exception("Property of XST requested of a non-XST packet. Actual packet marker is '{}', but must be 'X'.".format(self.marker))
signal_input_a = get_bit_value(self.data_id_raw, 8, 15)
signal_input_b = get_bit_value(self.data_id_raw, 0, 7)
return (signal_input_a, signal_input_b)
@property
def nof_signal_inputs(self) -> int:
""" Number of inputs that were used for constructing the payload. """
return unpack(">B",self.packet[18:19])[0]
@property
def nof_bytes_per_statistic(self) -> int:
""" Word size for the payload. """
return unpack(">B",self.packet[19:20])[0]
@property
def nof_statistics_per_packet(self) -> int:
""" Number of data points in the payload. """
return unpack(">H",self.packet[20:22])[0]
@property
def block_period_raw(self) -> int:
""" Return the block period, in nanoseconds. """
return unpack(">H",self.packet[22:24])[0]
def block_period(self) -> float: def block_period(self) -> float:
""" Return the block period, in seconds. """ """ Return the block period, in seconds. """
return self.block_period_raw / 1e9 return self.block_period_raw / 1e9
@property
def block_serial_number(self) -> int:
""" Block index since epoch (1970). """
return unpack(">Q",self.packet[24:32])[0]
def timestamp(self) -> datetime: def timestamp(self) -> datetime:
""" Returns the timestamp of the data in this packet. """ Returns the timestamp of the data in this packet.
...@@ -231,11 +179,22 @@ class StatisticsPacket(object): ...@@ -231,11 +179,22 @@ class StatisticsPacket(object):
"version_id": self.version_id, "version_id": self.version_id,
"observation_id": self.observation_id, "observation_id": self.observation_id,
"station_id": self.station_id, "station_id": self.station_id,
"source_info": self.source_info, "source_info": {
"reserved": self.reserved, "_raw": self.source_info,
"antenna_band_index": self.antenna_band_index,
"nyquist_zone_index": self.nyquist_zone_index,
"t_adc": self.t_adc,
"fsub_type": self.fsub_type,
"payload_error": self.payload_error,
"beam_repositioning_flag": self.beam_repositioning_flag,
"subband_calibrated_flag": self.subband_calibrated_flag,
"gn_index": self.gn_index,
},
"data_id": {
"_raw": self.data_id,
},
"integration_interval_raw": self.integration_interval_raw, "integration_interval_raw": self.integration_interval_raw,
"integration_interval": self.integration_interval(), "integration_interval": self.integration_interval(),
"data_id_raw": self.data_id_raw,
"nof_signal_inputs": self.nof_signal_inputs, "nof_signal_inputs": self.nof_signal_inputs,
"nof_bytes_per_statistic": self.nof_bytes_per_statistic, "nof_bytes_per_statistic": self.nof_bytes_per_statistic,
"nof_statistics_per_packet": self.nof_statistics_per_packet, "nof_statistics_per_packet": self.nof_statistics_per_packet,
...@@ -245,36 +204,108 @@ class StatisticsPacket(object): ...@@ -245,36 +204,108 @@ class StatisticsPacket(object):
"timestamp": self.timestamp(), "timestamp": self.timestamp(),
} }
# add statistics-specific fields
if self.marker == 'S':
header.update({
"sst_signal_input_index": self.sst_signal_input_index,
})
elif self.marker == 'B':
header.update({
"bst_beamlet_index": self.bst_beamlet_index,
})
elif self.marker == 'X':
header.update({
"xst_subband_index": self.xst_subband_index,
"xst_baseline": self.xst_baseline,
})
return header return header
@property class SSTPacket(StatisticsPacket):
def payload_sst(self) -> numpy.array: """
""" The payload of this packet, interpreted as SST data. """ Models an SST statistics UDP packet from SDP.
The following fields are exposed as properties & functions.
signal_input_index: input (antenna polarisation) index for which this packet contains statistics
payload[nof_statistics_per_packet]: SST statistics, an array of amplitudes per subband.
"""
def __init__(self, packet):
super().__init__(packet)
# We only parse SST packets
if self.marker != 'S': if self.marker != 'S':
raise Exception("Payload of SST requested of a non-SST packet. Actual packet marker is '{}', but must be 'S'.".format(self.marker)) raise Exception("Payload of SST requested of a non-SST packet. Actual packet marker is '{}', but must be 'S'.".format(self.marker))
def unpack_data_id(self):
super().unpack_data_id()
self.signal_input_index = get_bit_value(self.data_id, 0, 7)
def header(self):
header = super().header()
header["data_id"]["signal_input_index"] = self.signal_input_index
return header
@property
def payload(self) -> numpy.array:
""" The payload of this packet, interpreted as SST data. """
# derive which and how many elements to read from the packet header # derive which and how many elements to read from the packet header
bytecount_to_unsigned_struct_type = { 1: 'B', 2: 'H', 4: 'I', 8: 'Q' } bytecount_to_unsigned_struct_type = { 1: 'B', 2: 'H', 4: 'I', 8: 'Q' }
format_str = ">{}{}".format(self.nof_statistics_per_packet, bytecount_to_unsigned_struct_type[self.nof_bytes_per_statistic]) format_str = ">{}{}".format(self.nof_statistics_per_packet, bytecount_to_unsigned_struct_type[self.nof_bytes_per_statistic])
return numpy.array(unpack(format_str, self.packet[32:32+calcsize(format_str)])) return numpy.array(struct.unpack(format_str, self.packet[self.header_size:self.header_size + struct.calcsize(format_str)]))
class XSTPacket(StatisticsPacket):
"""
Models an XST statistics UDP packet from SDP.
The following fields are exposed as properties & functions.
subband_index: subband number for which this packet contains statistics.
baseline: antenna pair for which this packet contains statistics.
"""
def __init__(self, packet):
super().__init__(packet)
# We only parse XST packets
if self.marker != 'X':
raise Exception("Payload of XST requested of a non-XST packet. Actual packet marker is '{}', but must be 'X'.".format(self.marker))
def unpack_data_id(self):
super().unpack_data_id()
self.subband_index = get_bit_value(self.data_id, 16, 24)
self.baseline = (get_bit_value(self.data_id, 8, 15), get_bit_value(self.data_id, 0, 7))
def header(self):
header = super().header()
header["data_id"]["subband_index"] = self.subband_index
header["data_id"]["baseline"] = self.baseline
return header
class BSTPacket(StatisticsPacket):
"""
Models an BST statistics UDP packet from SDP.
The following fields are exposed as properties & functions.
bst_beamlet_index: the number of the beamlet for which this packet holds statistics.
"""
def __init__(self, packet):
super().__init__(packet)
# We only parse BST packets
if self.marker != 'B':
raise Exception("Payload of BST requested of a non-BST packet. Actual packet marker is '{}', but must be 'B'.".format(self.marker))
def unpack_data_id(self):
super().unpack_data_id()
self.bst_beamlet_index = get_bit_value(self.data_id, 0, 15)
def header(self):
header = super().header()
header["data_id"]["bst_beamlet_index"] = self.bst_beamlet_index
return header
if __name__ == "__main__": if __name__ == "__main__":
# parse one packet from stdin # parse one packet from stdin
...@@ -284,9 +315,9 @@ if __name__ == "__main__": ...@@ -284,9 +315,9 @@ if __name__ == "__main__":
# read all of stdin, even though we only parse the first packet. we're too lazy to intelligently decide when # read all of stdin, even though we only parse the first packet. we're too lazy to intelligently decide when
# the packet is complete and can stop reading. # the packet is complete and can stop reading.
data = sys.stdin.buffer.read() data = sys.stdin.buffer.read()
packet = StatisticsPacket(data) packet = SSTPacket(data)
# print header & payload # print header & payload
pprint.pprint(packet.header()) pprint.pprint(packet.header())
pprint.pprint(packet.payload_sst) pprint.pprint(packet.payload)
...@@ -2,7 +2,7 @@ import queue ...@@ -2,7 +2,7 @@ import queue
from threading import Thread from threading import Thread
import socket import socket
from util.comms_client import CommClient from util.comms_client import CommClient
from clients.StatisticsPacket import StatisticsPacket from clients.StatisticsPacket import SSTPacket
from queue import Queue from queue import Queue
...@@ -224,19 +224,14 @@ class SST_collector(Thread): ...@@ -224,19 +224,14 @@ class SST_collector(Thread):
try: try:
try: try:
fields = StatisticsPacket(packet) fields = SSTPacket(packet)
except ValueError: except ValueError:
# could not parse header # not an SST packet
self.parameters["nof_invalid_packets"] += numpy.uint64(1)
return
if fields.marker != 'S':
# packet is not SST
self.parameters["nof_invalid_packets"] += numpy.uint64(1) self.parameters["nof_invalid_packets"] += numpy.uint64(1)
return return
# which input this packet contains data for # which input this packet contains data for
input_index = fields.sst_signal_input_index input_index = fields.signal_input_index
if input_index >= self.MAX_INPUTS: if input_index >= self.MAX_INPUTS:
# packet describes an input that is out of bounds for us # packet describes an input that is out of bounds for us
...@@ -250,7 +245,7 @@ class SST_collector(Thread): ...@@ -250,7 +245,7 @@ class SST_collector(Thread):
# process the packet # process the packet
self.parameters["nof_valid_payloads"][input_index] += numpy.uint64(1) self.parameters["nof_valid_payloads"][input_index] += numpy.uint64(1)
self.parameters["sst_values"][input_index][:fields.nof_statistics_per_packet] = fields.payload_sst self.parameters["sst_values"][input_index][:fields.nof_statistics_per_packet] = fields.payload
self.parameters["sst_timestamps"][input_index] = numpy.float64(fields.timestamp().timestamp()) self.parameters["sst_timestamps"][input_index] = numpy.float64(fields.timestamp().timestamp())
self.parameters["integration_intervals"][input_index] = fields.integration_interval() self.parameters["integration_intervals"][input_index] = fields.integration_interval()
except Exception as e: except Exception as e:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment