-
Jan David Mol authoredJan David Mol authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
SDP_statistics.py 6.06 KiB
from struct import unpack, calcsize
from datetime import datetime, timezone
import numpy
__all__ = ["StatisticsPacket"]
def get_bit_value(value: bytes, first_bit: int, last_bit:int=None) -> int:
""" Return bits [first_bit:last_bit] from value, and return their integer value. Bit 0 = LSB.
For example, extracting bits 2-3 from b'01100' returns 11 binary = 3 decimal:
get_bit_value(b'01100', 2, 3) == 3
If 'last_bit' is not given, just the value of bit 'first_bit' is returned. """
# default last_bit to first_bit
if last_bit is None:
last_bit = first_bit
return value >> first_bit & ((1 << (last_bit - first_bit + 1)) - 1)
class StatisticsPacket(object):
"""
Models a statistics UDP packet from SDP.
Packets are expected to be UDP payload only (so no Ethernet/IP/UDP headers).
"""
def __init__(self, packet: bytes):
self.packet = packet
@property
def marker(self) -> str:
""" Return the type of statistic:
'S' = SST
'B' = BST
'X' = XST
"""
raw_marker = unpack("c",self.packet[0:1])[0]
try:
return raw_marker.decode('ascii')
except UnicodeDecodeError:
# non-ascii (>127) character, return as binary
return raw_marker
@property
def version_id(self) -> int:
""" Return the version of this packet. """
return unpack("B",self.packet[1:2])[0]
@property
def observation_id(self) -> int:
""" Return the ID of the observation running when this packet was generated. """
return unpack("<I",self.packet[2:6])[0]
@property
def station_id(self) -> int:
""" Return the number of the station this packet was generated on. """
return unpack("<H",self.packet[6:8])[0]
@property
def source_info(self) -> int:
""" Return a dict with the source_info flags. """
bits = unpack("<H",self.packet[8:10])[0]
return {
"_raw": bits,
"antenna_band_index": get_bit_value(bits, 15),
"nyquist_zone_index": get_bit_value(bits, 13, 14),
"t_adc": get_bit_value(bits, 12),
"fsub_type": get_bit_value(bits, 11),
"payload_error": get_bit_value(bits, 10),
"beam_repositioning_flag": get_bit_value(bits, 9),
"subband_calibrated_flag": get_bit_value(bits, 8),
"reserved": get_bit_value(bits, 5, 7),
"gn_index": get_bit_value(bits, 0, 4),
}
@property
def reserved(self) -> bytes:
""" Reserved bytes. """
return self.packet[10:11]
@property
def integration_interval_raw(self) -> int:
""" Returns the integration interval, in blocks. """
# This field is 3 bytes, little endian, so we need to append a 0 to parse it as a 32-bit integer.
return unpack("<I", self.packet[11:14] + b'0')[0]
def integration_interval(self) -> float:
""" Returns the integration interval, in seconds. """
# Translate to seconds using the block period
return self.integration_interval_raw * self.block_period
@property
def data_id(self) -> int:
return unpack("<I",self.packet[14:18])[0]
@property
def nof_signal_inputs(self) -> int:
return unpack("<B",self.packet[18:19])[0]
@property
def nof_bytes_per_statistic(self) -> int:
""" Word size for the payload. """
return unpack("<B",self.packet[19:20])[0]
@property
def nof_statistics_per_packet(self) -> int:
""" Number of data points in the payload. """
return unpack("<H",self.packet[20:22])[0]
@property
def block_period_raw(self) -> int:
""" Return the block period, in nanoseconds. """
return unpack("<H",self.packet[22:24])[0]
def block_period(self) -> float:
""" Return the block period, in seconds. """
return self.block_period_raw / 1e9
@property
def block_serial_number(self) -> int:
""" Block index since epoch (1970). """
return unpack("<Q",self.packet[24:32])[0]
def timestamp(self) -> datetime:
""" Returns the timestamp of the data in this packet. """
return datetime.fromtimestamp(self.block_serial_number * self.block_period, timezone.utc)
def header(self) -> dict:
""" Return the header as a dict. """
return {
"marker": self.marker,
"version_id": self.version_id,
"observation_id": self.observation_id,
"station_id": self.station_id,
"source_info": self.source_info,
"reserved": self.reserved,
"integration_interval_raw": self.integration_interval_raw,
"integration_interval": self.integration_interval(),
"data_id": self.data_id,
"nof_signal_inputs": self.nof_signal_inputs,
"nof_bytes_per_statistic": self.nof_bytes_per_statistic,
"nof_statistics_per_packet": self.nof_statistics_per_packet,
"block_period_raw": self.block_period_raw,
"block_period": self.block_period(),
"block_serial_number": self.block_serial_number,
"timestamp": self.timestamp(),
}
@property
def payload_sst(self) -> numpy.array:
""" The payload of this packet, interpreted as SST data. """
# derive which and how many elements to read from the packet header
bytes_to_unsigned_struct_type = { 1: 'B', 2: 'H', 4: 'I', 8: 'Q' }
format_str = "<{}{}".format(self.nof_statistics_per_packet, bytes_to_unsigned_struct_type[self.nof_bytes_per_statistic])
return numpy.array(unpack(format_str, self.packet[32:32+calcsize(format_str)]))
if __name__ == "__main__":
# parse one packet from stdin
import sys
import pprint
# read all of stdin, even though we only parse the first packet. we're too lazy to intelligently decide when
# the packet is complete and can stop reading.
data = sys.stdin.buffer.read()
packet = StatisticsPacket(data)
# print header & payload
pprint.pprint(packet.header())
pprint.pprint(packet.payload_sst)