Newer
Older

Jan David Mol
committed
from struct import unpack, calcsize

Jan David Mol
committed
from datetime import datetime, timezone

Jan David Mol
committed
__all__ = ["StatisticsPacket"]

Jan David Mol
committed
def extract_bits(value, first, last=None):
""" Return bits [first:last] from value. If last is not given, the value of bit 'first' is returned. Bit 0 = LSB. """

Jan David Mol
committed
# default last to first
if last is None:
last = first
return value >> first & ((1 << (last - first + 1)) - 1)
class StatisticsPacket(object):
"""
Models a statistics UDP packet from SDP.

Jan David Mol
committed
Packets are expected to be UDP payload only (so no Ethernet/IP/UDP headers).

Jan David Mol
committed
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
"""
def __init__(self, packet: bytes):
self.packet = packet
@property
def marker(self) -> str:
""" Return the type of statistic:
'S' = SST
'B' = BST
'X' = XST
"""
raw_marker = unpack("c",self.packet[0:1])[0]
try:
return raw_marker.decode('ascii')
except UnicodeDecodeError:
# non-ascii (>127) character, return as binary
return raw_marker
@property
def version_id(self) -> int:
""" Return the version of this packet. """
return unpack("B",self.packet[1:2])[0]
@property
def observation_id(self) -> int:
""" Return the ID of the observation running when this packet was generated. """
return unpack("<I",self.packet[2:6])[0]
@property
def station_id(self) -> int:
""" Return the number of the station this packet was generated on. """
return unpack("<H",self.packet[6:8])[0]
@property
def source_info(self) -> int:
""" Return a dict with the source_info flags. """
bits = unpack("<H",self.packet[8:10])[0]
return {
"_raw": bits,
"antenna_band_index": extract_bits(bits, 15),
"nyquist_zone_index": extract_bits(bits, 13, 14),
"t_adc": extract_bits(bits, 12),
"fsub_type": extract_bits(bits, 11),
"payload_error": extract_bits(bits, 10),
"beam_repositioning_flag": extract_bits(bits, 9),
"subband_calibrated_flag": extract_bits(bits, 8),

Jan David Mol
committed
"reserved": extract_bits(bits, 5, 7),

Jan David Mol
committed
"gn_index": extract_bits(bits, 0, 4),
}
@property

Jan David Mol
committed
def reserved(self) -> bytes:

Jan David Mol
committed
""" Reserved bytes. """
return self.packet[10:11]
@property

Jan David Mol
committed
def integration_interval_raw(self) -> int:
""" Returns the integration interval, in blocks. """

Jan David Mol
committed
# This field is 3 bytes, little endian, so we need to append a 0 to parse it as a 32-bit integer.

Jan David Mol
committed
return unpack("<I", self.packet[11:14] + b'0')[0]
def integration_interval(self) -> float:
""" Returns the integration interval, in seconds. """

Jan David Mol
committed
# Translate to seconds using the block period

Jan David Mol
committed
return self.integration_interval_raw * self.block_period

Jan David Mol
committed
@property
def data_id(self) -> int:
return unpack("<I",self.packet[14:18])[0]
@property
def nof_signal_inputs(self) -> int:
return unpack("<B",self.packet[18:19])[0]
@property
def nof_bytes_per_statistic(self) -> int:

Jan David Mol
committed
""" Word size for the payload. """

Jan David Mol
committed
return unpack("<B",self.packet[19:20])[0]
@property
def nof_statistics_per_packet(self) -> int:

Jan David Mol
committed
""" Number of data points in the payload. """

Jan David Mol
committed
return unpack("<H",self.packet[20:22])[0]

Jan David Mol
committed
@property
def block_period_raw(self) -> int:
""" Return the block period, in nanoseconds. """
return unpack("<H",self.packet[22:24])[0]

Jan David Mol
committed
def block_period(self) -> float:
""" Return the block period, in seconds. """

Jan David Mol
committed
return self.block_period_raw / 1e9

Jan David Mol
committed
@property
def block_serial_number(self) -> int:

Jan David Mol
committed
""" Block index since epoch (1970). """

Jan David Mol
committed
return unpack("<Q",self.packet[24:32])[0]
def timestamp(self) -> datetime:
""" Returns the timestamp of the data in this packet. """
return datetime.fromtimestamp(self.block_serial_number * self.block_period, timezone.utc)
def header(self) -> dict:
""" Return the header as a dict. """

Jan David Mol
committed

Jan David Mol
committed
return {
"marker": self.marker,
"version_id": self.version_id,
"observation_id": self.observation_id,
"station_id": self.station_id,
"source_info": self.source_info,

Jan David Mol
committed
"reserved": self.reserved,

Jan David Mol
committed
"integration_interval_raw": self.integration_interval_raw,

Jan David Mol
committed
"integration_interval": self.integration_interval(),

Jan David Mol
committed
"data_id": self.data_id,
"nof_signal_inputs": self.nof_signal_inputs,
"nof_bytes_per_statistic": self.nof_bytes_per_statistic,
"nof_statistics_per_packet": self.nof_statistics_per_packet,

Jan David Mol
committed
"block_period_raw": self.block_period_raw,

Jan David Mol
committed
"block_period": self.block_period(),

Jan David Mol
committed
"block_serial_number": self.block_serial_number,

Jan David Mol
committed
"timestamp": self.timestamp(),

Jan David Mol
committed
}
@property
def payload_sst(self):
""" The payload of this packet, interpreted as SST data. """

Jan David Mol
committed
# derive which and how many elements to read from the packet header

Jan David Mol
committed
bytes_to_unsigned_struct_type = { 1: 'B', 2: 'H', 4: 'I', 8: 'Q' }
format_str = "<{}{}".format(self.nof_statistics_per_packet, bytes_to_unsigned_struct_type[self.nof_bytes_per_statistic])
return list(unpack(format_str, self.packet[32:32+calcsize(format_str)]))

Jan David Mol
committed
if __name__ == "__main__":
# parse one packet from stdin
import sys
import pprint
data = sys.stdin.buffer.read()
packet = StatisticsPacket(data)
pprint.pprint(packet.header())

Jan David Mol
committed
pprint.pprint(packet.payload_sst)