Newer
Older

Jan David Mol
committed
from struct import unpack, calcsize

Jan David Mol
committed
from datetime import datetime, timezone
import numpy

Jan David Mol
committed

Jan David Mol
committed
__all__ = ["StatisticsPacket"]
def get_bit_value(value: bytes, first_bit: int, last_bit:int=None) -> int:
""" Return bits [first_bit:last_bit] from value, and return their integer value. Bit 0 = LSB.
For example, extracting bits 2-3 from b'01100' returns 11 binary = 3 decimal:
get_bit_value(b'01100', 2, 3) == 3
If 'last_bit' is not given, just the value of bit 'first_bit' is returned. """

Jan David Mol
committed
# default last_bit to first_bit
if last_bit is None:
last_bit = first_bit

Jan David Mol
committed
return value >> first_bit & ((1 << (last_bit - first_bit + 1)) - 1)

Jan David Mol
committed
class StatisticsPacket(object):
"""
Models a statistics UDP packet from SDP.

Jan David Mol
committed
Packets are expected to be UDP payload only (so no Ethernet/IP/UDP headers).

Jan David Mol
committed
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
"""
def __init__(self, packet: bytes):
self.packet = packet
@property
def marker(self) -> str:
""" Return the type of statistic:
'S' = SST
'B' = BST
'X' = XST
"""
raw_marker = unpack("c",self.packet[0:1])[0]
try:
return raw_marker.decode('ascii')
except UnicodeDecodeError:
# non-ascii (>127) character, return as binary
return raw_marker
@property
def version_id(self) -> int:
""" Return the version of this packet. """
return unpack("B",self.packet[1:2])[0]
@property
def observation_id(self) -> int:
""" Return the ID of the observation running when this packet was generated. """
return unpack("<I",self.packet[2:6])[0]
@property
def station_id(self) -> int:
""" Return the number of the station this packet was generated on. """
return unpack("<H",self.packet[6:8])[0]
@property
def source_info(self) -> int:
""" Return a dict with the source_info flags. The dict contains the following fields:
_raw: raw value of the source_info field in the packet, as an integer.
antenna_band_index: antenna type. 0 = low band, 1 = high band.
nyquist_zone_index: nyquist zone of filter:
0 = 0 -- 1/2 * t_adc Hz (low band),
1 = 1/2 * t_adc -- t_adc Hz (high band),
2 = t_adc -- 3/2 * t_adc Hz (high band).
t_adc: sampling clock. 0 = 160 MHz, 1 = 200 MHz.
fsub_type: sampling method. 0 = critically sampled, 1 = oversampled.
payload_error: 0 = data is ok, 1 = data is corrupted (a fault was encountered).
beam_repositioning_flag: 0 = data is ok, 1 = beam got repositioned during packet construction (BST only).
subband_calibrated_flag: 1 = subband data had subband calibration values applied, 0 = not
reserved: reserved bits
gn_index: global index of FPGA that emitted this packet. """

Jan David Mol
committed
bits = unpack("<H",self.packet[8:10])[0]
return {
"_raw": bits,
"antenna_band_index": get_bit_value(bits, 15),
"nyquist_zone_index": get_bit_value(bits, 13, 14),
"t_adc": get_bit_value(bits, 12),
"fsub_type": get_bit_value(bits, 11),
"payload_error": get_bit_value(bits, 10),
"beam_repositioning_flag": get_bit_value(bits, 9),
"subband_calibrated_flag": get_bit_value(bits, 8),
"reserved": get_bit_value(bits, 5, 7),
"gn_index": get_bit_value(bits, 0, 4),

Jan David Mol
committed
}
@property

Jan David Mol
committed
def reserved(self) -> bytes:

Jan David Mol
committed
""" Reserved bytes. """
return self.packet[10:11]
@property

Jan David Mol
committed
def integration_interval_raw(self) -> int:
""" Returns the integration interval, in blocks. """

Jan David Mol
committed
# This field is 3 bytes, little endian, so we need to append a 0 to parse it as a 32-bit integer.

Jan David Mol
committed
return unpack("<I", self.packet[11:14] + b'0')[0]
def integration_interval(self) -> float:
""" Returns the integration interval, in seconds. """

Jan David Mol
committed
# Translate to seconds using the block period

Jan David Mol
committed
return self.integration_interval_raw * self.block_period

Jan David Mol
committed
@property
def data_id(self) -> int:
""" Returns the generic data identifier. """

Jan David Mol
committed
return unpack("<I",self.packet[14:18])[0]
@property
def nof_signal_inputs(self) -> int:
""" Number of inputs that were used for constructing the payload. """

Jan David Mol
committed
return unpack("<B",self.packet[18:19])[0]
@property
def nof_bytes_per_statistic(self) -> int:

Jan David Mol
committed
""" Word size for the payload. """

Jan David Mol
committed
return unpack("<B",self.packet[19:20])[0]
@property
def nof_statistics_per_packet(self) -> int:

Jan David Mol
committed
""" Number of data points in the payload. """

Jan David Mol
committed
return unpack("<H",self.packet[20:22])[0]

Jan David Mol
committed
@property
def block_period_raw(self) -> int:
""" Return the block period, in nanoseconds. """
return unpack("<H",self.packet[22:24])[0]

Jan David Mol
committed
def block_period(self) -> float:
""" Return the block period, in seconds. """

Jan David Mol
committed
return self.block_period_raw / 1e9

Jan David Mol
committed
@property
def block_serial_number(self) -> int:

Jan David Mol
committed
""" Block index since epoch (1970). """

Jan David Mol
committed
return unpack("<Q",self.packet[24:32])[0]
def timestamp(self) -> datetime:
""" Returns the timestamp of the data in this packet. """
return datetime.fromtimestamp(self.block_serial_number * self.block_period, timezone.utc)
def header(self) -> dict:
""" Return all the header fields as a dict. """

Jan David Mol
committed

Jan David Mol
committed
return {
"marker": self.marker,
"version_id": self.version_id,
"observation_id": self.observation_id,
"station_id": self.station_id,
"source_info": self.source_info,

Jan David Mol
committed
"reserved": self.reserved,

Jan David Mol
committed
"integration_interval_raw": self.integration_interval_raw,

Jan David Mol
committed
"integration_interval": self.integration_interval(),

Jan David Mol
committed
"data_id": self.data_id,
"nof_signal_inputs": self.nof_signal_inputs,
"nof_bytes_per_statistic": self.nof_bytes_per_statistic,
"nof_statistics_per_packet": self.nof_statistics_per_packet,

Jan David Mol
committed
"block_period_raw": self.block_period_raw,

Jan David Mol
committed
"block_period": self.block_period(),

Jan David Mol
committed
"block_serial_number": self.block_serial_number,

Jan David Mol
committed
"timestamp": self.timestamp(),

Jan David Mol
committed
}
@property
def payload_sst(self) -> numpy.array:

Jan David Mol
committed
""" The payload of this packet, interpreted as SST data. """
if self.marker != 'S':
raise Exception("Payload of SST requested of a non-SST packet. Actual packet marker is '{}', but must be 'S'.".format(self.marker))

Jan David Mol
committed
# derive which and how many elements to read from the packet header
bytecount_to_unsigned_struct_type = { 1: 'B', 2: 'H', 4: 'I', 8: 'Q' }
format_str = "<{}{}".format(self.nof_statistics_per_packet, bytecount_to_unsigned_struct_type[self.nof_bytes_per_statistic])

Jan David Mol
committed
return numpy.array(unpack(format_str, self.packet[32:32+calcsize(format_str)]))

Jan David Mol
committed

Jan David Mol
committed
if __name__ == "__main__":
# parse one packet from stdin
import sys
import pprint
# read all of stdin, even though we only parse the first packet. we're too lazy to intelligently decide when
# the packet is complete and can stop reading.

Jan David Mol
committed
data = sys.stdin.buffer.read()
packet = StatisticsPacket(data)

Jan David Mol
committed
pprint.pprint(packet.header())

Jan David Mol
committed
pprint.pprint(packet.payload_sst)