Newer
Older

Jan David Mol
committed
from struct import unpack, calcsize

Jan David Mol
committed
from datetime import datetime, timezone

Jan David Mol
committed
__all__ = ["StatisticsPacket"]

Jan David Mol
committed
def extract_bits(value, first, last=None):
""" Return bits [first:last] from value. """
# default last to first
if last is None:
last = first
return value >> first & ((1 << (last - first + 1)) - 1)
class StatisticsPacket(object):
"""
Models a statistics UDP packet from SDP.

Jan David Mol
committed
Packets are expected to be UDP payload only (so no Ethernet/IP/UDP headers).

Jan David Mol
committed
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
"""
def __init__(self, packet: bytes):
self.packet = packet
@property
def marker(self) -> str:
""" Return the type of statistic:
'S' = SST
'B' = BST
'X' = XST
"""
raw_marker = unpack("c",self.packet[0:1])[0]
try:
return raw_marker.decode('ascii')
except UnicodeDecodeError:
# non-ascii (>127) character, return as binary
return raw_marker
@property
def version_id(self) -> int:
""" Return the version of this packet. """
return unpack("B",self.packet[1:2])[0]
@property
def observation_id(self) -> int:
""" Return the ID of the observation running when this packet was generated. """
return unpack("<I",self.packet[2:6])[0]
@property
def station_id(self) -> int:
""" Return the number of the station this packet was generated on. """
return unpack("<H",self.packet[6:8])[0]
@property
def source_info(self) -> int:
""" Return a dict with the source_info flags. """
bits = unpack("<H",self.packet[8:10])[0]
return {
"_raw": bits,
"antenna_band_index": extract_bits(bits, 15),
"nyquist_zone_index": extract_bits(bits, 13, 14),
"t_adc": extract_bits(bits, 12),
"fsub_type": extract_bits(bits, 11),
"payload_error": extract_bits(bits, 10),
"beam_repositioning_flag": extract_bits(bits, 9),
"subband_calibrated_flag": extract_bits(bits, 8),
"reserved_0": extract_bits(bits, 5, 7),
"gn_index": extract_bits(bits, 0, 4),
}
@property
def reserved_0(self) -> bytes:
""" Reserved bytes. """
return self.packet[10:11]
@property

Jan David Mol
committed
def integration_interval_raw(self) -> int:
""" Returns the integration interval, in blocks. """

Jan David Mol
committed
# This field is 3 bytes, little endian, so we need to append a 0 to parse it as a 32-bit integer.

Jan David Mol
committed
return unpack("<I", self.packet[11:14] + b'0')[0]
@property
def integration_interval(self) -> float:
""" Returns the integration interval, in seconds. """

Jan David Mol
committed
# Translate to seconds using the block period

Jan David Mol
committed
return self.integration_interval_raw * self.block_period

Jan David Mol
committed
@property
def data_id(self) -> int:
return unpack("<I",self.packet[14:18])[0]
@property
def nof_signal_inputs(self) -> int:
return unpack("<B",self.packet[18:19])[0]
@property
def nof_bytes_per_statistic(self) -> int:
return unpack("<B",self.packet[19:20])[0]
@property
def nof_statistics_per_packet(self) -> int:
return unpack("<H",self.packet[20:22])[0]

Jan David Mol
committed
@property
def block_period_raw(self) -> int:
""" Return the block period, in nanoseconds. """
return unpack("<H",self.packet[22:24])[0]

Jan David Mol
committed
@property
def block_period(self) -> float:
""" Return the block period, in seconds. """

Jan David Mol
committed
return self.block_period_raw / 1e9

Jan David Mol
committed
@property
def block_serial_number(self) -> int:
return unpack("<Q",self.packet[24:32])[0]
@property
def timestamp(self) -> datetime:
""" Returns the timestamp of the data in this packet. """
return datetime.fromtimestamp(self.block_serial_number * self.block_period, timezone.utc)
def header(self) -> dict:
""" Return the header as a dict. """
return {
"marker": self.marker,
"version_id": self.version_id,
"observation_id": self.observation_id,
"station_id": self.station_id,
"source_info": self.source_info,
"reserved_0": self.reserved_0,

Jan David Mol
committed
"integration_interval_raw": self.integration_interval_raw,

Jan David Mol
committed
"integration_interval": self.integration_interval,
"data_id": self.data_id,
"nof_signal_inputs": self.nof_signal_inputs,
"nof_bytes_per_statistic": self.nof_bytes_per_statistic,
"nof_statistics_per_packet": self.nof_statistics_per_packet,

Jan David Mol
committed
"block_period_raw": self.block_period_raw,

Jan David Mol
committed
"block_period": self.block_period,
"block_serial_number": self.block_serial_number,
"timestamp": self.timestamp,
}
@property
def payload_sst(self):
""" The payload of this packet, interpreted as SST data. """

Jan David Mol
committed
bytes_to_unsigned_struct_type = { 1: 'B', 2: 'H', 4: 'I', 8: 'Q' }
format_str = "<{}{}".format(self.nof_statistics_per_packet, bytes_to_unsigned_struct_type[self.nof_bytes_per_statistic])
return list(unpack(format_str, self.packet[32:32+calcsize(format_str)]))

Jan David Mol
committed
if __name__ == "__main__":
# parse one packet from stdin
import sys
import pprint
data = sys.stdin.buffer.read()
packet = StatisticsPacket(data)
pprint.pprint(packet.header())

Jan David Mol
committed
pprint.pprint(packet.payload_sst)