Newer
Older

Jan David Mol
committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
from struct import unpack
from datetime import datetime, timezone
def extract_bits(value, first, last=None):
""" Return bits [first:last] from value. """
# default last to first
if last is None:
last = first
return value >> first & ((1 << (last - first + 1)) - 1)
class StatisticsPacket(object):
"""
Models a statistics UDP packet from SDP.
Packets are expected to be UDP payload only.
"""
def __init__(self, packet: bytes):
self.packet = packet
@property
def marker(self) -> str:
""" Return the type of statistic:
'S' = SST
'B' = BST
'X' = XST
"""
raw_marker = unpack("c",self.packet[0:1])[0]
try:
return raw_marker.decode('ascii')
except UnicodeDecodeError:
# non-ascii (>127) character, return as binary
return raw_marker
@property
def version_id(self) -> int:
""" Return the version of this packet. """
return unpack("B",self.packet[1:2])[0]
@property
def observation_id(self) -> int:
""" Return the ID of the observation running when this packet was generated. """
return unpack("<I",self.packet[2:6])[0]
@property
def station_id(self) -> int:
""" Return the number of the station this packet was generated on. """
return unpack("<H",self.packet[6:8])[0]
@property
def source_info(self) -> int:
""" Return a dict with the source_info flags. """
bits = unpack("<H",self.packet[8:10])[0]
return {
"_raw": bits,
"antenna_band_index": extract_bits(bits, 15),
"nyquist_zone_index": extract_bits(bits, 13, 14),
"t_adc": extract_bits(bits, 12),
"fsub_type": extract_bits(bits, 11),
"payload_error": extract_bits(bits, 10),
"beam_repositioning_flag": extract_bits(bits, 9),
"subband_calibrated_flag": extract_bits(bits, 8),
"reserved_0": extract_bits(bits, 5, 7),
"gn_index": extract_bits(bits, 0, 4),
}
@property
def reserved_0(self) -> bytes:
""" Reserved bytes. """
return self.packet[10:11]
@property
def integration_interval(self) -> float:
""" Returns the integration interval, in seconds. """
# This field is 3 bytes, little endian, so we need to append a 0 to parse it as a 32-bit integer.
raw_interval = unpack("<I", self.packet[11:14] + b'0')[0]
# Translate to seconds using the block period
return raw_interval * self.block_period
@property
def data_id(self) -> int:
return unpack("<I",self.packet[14:18])[0]
@property
def nof_signal_inputs(self) -> int:
return unpack("<B",self.packet[18:19])[0]
@property
def nof_bytes_per_statistic(self) -> int:
return unpack("<B",self.packet[19:20])[0]
@property
def nof_statistics_per_packet(self) -> int:
return unpack("<H",self.packet[20:22])[0]
@property
def block_period(self) -> float:
""" Return the block period, in seconds. """
return unpack("<H",self.packet[22:24])[0] / 1e9
@property
def block_serial_number(self) -> int:
return unpack("<Q",self.packet[24:32])[0]
@property
def timestamp(self) -> datetime:
""" Returns the timestamp of the data in this packet. """
return datetime.fromtimestamp(self.block_serial_number * self.block_period, timezone.utc)
def header(self) -> dict:
""" Return the header as a dict. """
return {
"marker": self.marker,
"version_id": self.version_id,
"observation_id": self.observation_id,
"station_id": self.station_id,
"source_info": self.source_info,
"reserved_0": self.reserved_0,
"integration_interval": self.integration_interval,
"data_id": self.data_id,
"nof_signal_inputs": self.nof_signal_inputs,
"nof_bytes_per_statistic": self.nof_bytes_per_statistic,
"nof_statistics_per_packet": self.nof_statistics_per_packet,
"block_period": self.block_period,
"block_serial_number": self.block_serial_number,
"timestamp": self.timestamp,
}
@property
def payload_sst(self):
""" The payload of this packet, interpreted as SST data. """
return list(unpack("<{}Q".format(self.nof_statistics_per_packet), self.packet[32:]))
if __name__ == "__main__":
# parse one packet from stdin
import sys
import pprint
data = sys.stdin.buffer.read()
packet = StatisticsPacket(data)
pprint.pprint(packet.header())
pprint.pprint(packet.payload_sst())