from struct import unpack, calcsize from datetime import datetime, timezone __all__ = ["StatisticsPacket"] def extract_bits(value, first, last=None): """ Return bits [first:last] from value. """ # default last to first if last is None: last = first return value >> first & ((1 << (last - first + 1)) - 1) class StatisticsPacket(object): """ Models a statistics UDP packet from SDP. Packets are expected to be UDP payload only (so no Ethernet/IP/UDP headers). """ def __init__(self, packet: bytes): self.packet = packet @property def marker(self) -> str: """ Return the type of statistic: 'S' = SST 'B' = BST 'X' = XST """ raw_marker = unpack("c",self.packet[0:1])[0] try: return raw_marker.decode('ascii') except UnicodeDecodeError: # non-ascii (>127) character, return as binary return raw_marker @property def version_id(self) -> int: """ Return the version of this packet. """ return unpack("B",self.packet[1:2])[0] @property def observation_id(self) -> int: """ Return the ID of the observation running when this packet was generated. """ return unpack("<I",self.packet[2:6])[0] @property def station_id(self) -> int: """ Return the number of the station this packet was generated on. """ return unpack("<H",self.packet[6:8])[0] @property def source_info(self) -> int: """ Return a dict with the source_info flags. """ bits = unpack("<H",self.packet[8:10])[0] return { "_raw": bits, "antenna_band_index": extract_bits(bits, 15), "nyquist_zone_index": extract_bits(bits, 13, 14), "t_adc": extract_bits(bits, 12), "fsub_type": extract_bits(bits, 11), "payload_error": extract_bits(bits, 10), "beam_repositioning_flag": extract_bits(bits, 9), "subband_calibrated_flag": extract_bits(bits, 8), "reserved_0": extract_bits(bits, 5, 7), "gn_index": extract_bits(bits, 0, 4), } @property def reserved_0(self) -> bytes: """ Reserved bytes. """ return self.packet[10:11] @property def integration_interval_raw(self) -> int: """ Returns the integration interval, in blocks. """ # This field is 3 bytes, little endian, so we need to append a 0 to parse it as a 32-bit integer. return unpack("<I", self.packet[11:14] + b'0')[0] @property def integration_interval(self) -> float: """ Returns the integration interval, in seconds. """ # Translate to seconds using the block period return self.integration_interval_raw * self.block_period @property def data_id(self) -> int: return unpack("<I",self.packet[14:18])[0] @property def nof_signal_inputs(self) -> int: return unpack("<B",self.packet[18:19])[0] @property def nof_bytes_per_statistic(self) -> int: return unpack("<B",self.packet[19:20])[0] @property def nof_statistics_per_packet(self) -> int: return unpack("<H",self.packet[20:22])[0] @property def block_period_raw(self) -> int: """ Return the block period, in nanoseconds. """ return unpack("<H",self.packet[22:24])[0] @property def block_period(self) -> float: """ Return the block period, in seconds. """ return self.block_period_raw / 1e9 @property def block_serial_number(self) -> int: return unpack("<Q",self.packet[24:32])[0] @property def timestamp(self) -> datetime: """ Returns the timestamp of the data in this packet. """ return datetime.fromtimestamp(self.block_serial_number * self.block_period, timezone.utc) def header(self) -> dict: """ Return the header as a dict. """ return { "marker": self.marker, "version_id": self.version_id, "observation_id": self.observation_id, "station_id": self.station_id, "source_info": self.source_info, "reserved_0": self.reserved_0, "integration_interval_raw": self.integration_interval_raw, "integration_interval": self.integration_interval, "data_id": self.data_id, "nof_signal_inputs": self.nof_signal_inputs, "nof_bytes_per_statistic": self.nof_bytes_per_statistic, "nof_statistics_per_packet": self.nof_statistics_per_packet, "block_period_raw": self.block_period_raw, "block_period": self.block_period, "block_serial_number": self.block_serial_number, "timestamp": self.timestamp, } @property def payload_sst(self): """ The payload of this packet, interpreted as SST data. """ bytes_to_unsigned_struct_type = { 1: 'B', 2: 'H', 4: 'I', 8: 'Q' } format_str = "<{}{}".format(self.nof_statistics_per_packet, bytes_to_unsigned_struct_type[self.nof_bytes_per_statistic]) return list(unpack(format_str, self.packet[32:32+calcsize(format_str)])) if __name__ == "__main__": # parse one packet from stdin import sys import pprint data = sys.stdin.buffer.read() packet = StatisticsPacket(data) pprint.pprint(packet.header()) pprint.pprint(packet.payload_sst)