Newer
Older

Jan David Mol
committed
from struct import unpack, calcsize

Jan David Mol
committed
from datetime import datetime, timezone
import numpy

Jan David Mol
committed

Jan David Mol
committed
__all__ = ["StatisticsPacket"]
def get_bit_value(value: bytes, first_bit: int, last_bit:int=None) -> int:
""" Return bits [first_bit:last_bit] from value, and return their integer value. Bit 0 = LSB.
For example, extracting bits 2-3 from b'01100' returns 11 binary = 3 decimal:
get_bit_value(b'01100', 2, 3) == 3
If 'last_bit' is not given, just the value of bit 'first_bit' is returned. """

Jan David Mol
committed
# default last_bit to first_bit
if last_bit is None:
last_bit = first_bit

Jan David Mol
committed
return value >> first_bit & ((1 << (last_bit - first_bit + 1)) - 1)

Jan David Mol
committed
class StatisticsPacket(object):
"""
Models a statistics UDP packet from SDP.

Jan David Mol
committed
Packets are expected to be UDP payload only (so no Ethernet/IP/UDP headers).

Jan David Mol
committed
"""
def __init__(self, packet: bytes):
self.packet = packet
# Only parse valid packets
if self.marker not in 'SBX':
raise ValueError("Invalid SDP statistics packet: packet marker (first byte) is '{}', not one of 'SBX'.".format(self.marker))

Jan David Mol
committed
@property
def marker(self) -> str:
""" Return the type of statistic:
'S' = SST
'B' = BST
'X' = XST
"""
raw_marker = unpack("c",self.packet[0:1])[0]
try:
return raw_marker.decode('ascii')

Jan David Mol
committed
except UnicodeDecodeError:
# non-ascii (>127) character, return as binary
#
# this is typically not visible to the user, as these packets are not SDP statistics packets,
# which the constructor will refuse to accept.

Jan David Mol
committed
return raw_marker
@property
def version_id(self) -> int:
""" Return the version of this packet. """
return unpack("B",self.packet[1:2])[0]
@property
def observation_id(self) -> int:
""" Return the ID of the observation running when this packet was generated. """
return unpack("<I",self.packet[2:6])[0]
@property
def station_id(self) -> int:
""" Return the number of the station this packet was generated on. """
return unpack("<H",self.packet[6:8])[0]
@property
def source_info(self) -> int:
""" Return a dict with the source_info flags. The dict contains the following fields:
_raw: raw value of the source_info field in the packet, as an integer.
antenna_band_index: antenna type. 0 = low band, 1 = high band.
nyquist_zone_index: nyquist zone of filter:
0 = 0 -- 1/2 * t_adc Hz (low band),
1 = 1/2 * t_adc -- t_adc Hz (high band),
2 = t_adc -- 3/2 * t_adc Hz (high band).
t_adc: sampling clock. 0 = 160 MHz, 1 = 200 MHz.
fsub_type: sampling method. 0 = critically sampled, 1 = oversampled.
payload_error: 0 = data is ok, 1 = data is corrupted (a fault was encountered).
beam_repositioning_flag: 0 = data is ok, 1 = beam got repositioned during packet construction (BST only).
subband_calibrated_flag: 1 = subband data had subband calibration values applied, 0 = not
reserved: reserved bits
gn_index: global index of FPGA that emitted this packet. """

Jan David Mol
committed
bits = unpack("<H",self.packet[8:10])[0]
return {
"_raw": bits,
"antenna_band_index": get_bit_value(bits, 15),
"nyquist_zone_index": get_bit_value(bits, 13, 14),
"t_adc": get_bit_value(bits, 12),
"fsub_type": get_bit_value(bits, 11),
"payload_error": get_bit_value(bits, 10),
"beam_repositioning_flag": get_bit_value(bits, 9),
"subband_calibrated_flag": get_bit_value(bits, 8),
"reserved": get_bit_value(bits, 5, 7),
"gn_index": get_bit_value(bits, 0, 4),

Jan David Mol
committed
}
@property

Jan David Mol
committed
def reserved(self) -> bytes:

Jan David Mol
committed
""" Reserved bytes. """
return self.packet[10:11]
@property

Jan David Mol
committed
def integration_interval_raw(self) -> int:
""" Returns the integration interval, in blocks. """

Jan David Mol
committed
# This field is 3 bytes, little endian, so we need to append a 0 to parse it as a 32-bit integer.

Jan David Mol
committed
return unpack("<I", self.packet[11:14] + b'0')[0]
def integration_interval(self) -> float:
""" Returns the integration interval, in seconds. """

Jan David Mol
committed
# Translate to seconds using the block period
return self.integration_interval_raw * self.block_period()

Jan David Mol
committed
@property
def data_id(self) -> int:
""" Returns the generic data identifier. """

Jan David Mol
committed
return unpack("<I",self.packet[14:18])[0]
@property
def nof_signal_inputs(self) -> int:
""" Number of inputs that were used for constructing the payload. """

Jan David Mol
committed
return unpack("<B",self.packet[18:19])[0]
@property
def nof_bytes_per_statistic(self) -> int:

Jan David Mol
committed
""" Word size for the payload. """

Jan David Mol
committed
return unpack("<B",self.packet[19:20])[0]
@property
def nof_statistics_per_packet(self) -> int:

Jan David Mol
committed
""" Number of data points in the payload. """

Jan David Mol
committed
return unpack("<H",self.packet[20:22])[0]

Jan David Mol
committed
@property
def block_period_raw(self) -> int:
""" Return the block period, in nanoseconds. """
return unpack("<H",self.packet[22:24])[0]

Jan David Mol
committed
def block_period(self) -> float:
""" Return the block period, in seconds. """

Jan David Mol
committed
return self.block_period_raw / 1e9

Jan David Mol
committed
@property
def block_serial_number(self) -> int:

Jan David Mol
committed
""" Block index since epoch (1970). """

Jan David Mol
committed
return unpack("<Q",self.packet[24:32])[0]
def timestamp(self) -> datetime:
""" Returns the timestamp of the data in this packet. """
return datetime.fromtimestamp(self.block_serial_number * self.block_period(), timezone.utc)

Jan David Mol
committed
def header(self) -> dict:
""" Return all the header fields as a dict. """

Jan David Mol
committed

Jan David Mol
committed
return {
"marker": self.marker,
"version_id": self.version_id,
"observation_id": self.observation_id,
"station_id": self.station_id,
"source_info": self.source_info,

Jan David Mol
committed
"reserved": self.reserved,

Jan David Mol
committed
"integration_interval_raw": self.integration_interval_raw,

Jan David Mol
committed
"integration_interval": self.integration_interval(),

Jan David Mol
committed
"data_id": self.data_id,
"nof_signal_inputs": self.nof_signal_inputs,
"nof_bytes_per_statistic": self.nof_bytes_per_statistic,
"nof_statistics_per_packet": self.nof_statistics_per_packet,

Jan David Mol
committed
"block_period_raw": self.block_period_raw,

Jan David Mol
committed
"block_period": self.block_period(),

Jan David Mol
committed
"block_serial_number": self.block_serial_number,

Jan David Mol
committed
"timestamp": self.timestamp(),

Jan David Mol
committed
}
@property
def payload_sst(self) -> numpy.array:

Jan David Mol
committed
""" The payload of this packet, interpreted as SST data. """
if self.marker != 'S':
raise Exception("Payload of SST requested of a non-SST packet. Actual packet marker is '{}', but must be 'S'.".format(self.marker))

Jan David Mol
committed
# derive which and how many elements to read from the packet header
bytecount_to_unsigned_struct_type = { 1: 'B', 2: 'H', 4: 'I', 8: 'Q' }
format_str = "<{}{}".format(self.nof_statistics_per_packet, bytecount_to_unsigned_struct_type[self.nof_bytes_per_statistic])

Jan David Mol
committed
return numpy.array(unpack(format_str, self.packet[32:32+calcsize(format_str)]))

Jan David Mol
committed

Jan David Mol
committed
if __name__ == "__main__":
# parse one packet from stdin
import sys
import pprint
# read all of stdin, even though we only parse the first packet. we're too lazy to intelligently decide when
# the packet is complete and can stop reading.

Jan David Mol
committed
data = sys.stdin.buffer.read()
packet = StatisticsPacket(data)

Jan David Mol
committed
pprint.pprint(packet.header())

Jan David Mol
committed
pprint.pprint(packet.payload_sst)