from struct import unpack, calcsize from datetime import datetime, timezone import numpy __all__ = ["StatisticsPacket"] def get_bit_value(value: bytes, first_bit: int, last_bit:int=None) -> int: """ Return bits [first_bit:last_bit] from value, and return their integer value. Bit 0 = LSB. For example, extracting bits 2-3 from b'01100' returns 11 binary = 3 decimal: get_bit_value(b'01100', 2, 3) == 3 If 'last_bit' is not given, just the value of bit 'first_bit' is returned. """ # default last_bit to first_bit if last_bit is None: last_bit = first_bit return value >> first_bit & ((1 << (last_bit - first_bit + 1)) - 1) class StatisticsPacket(object): """ Models a statistics UDP packet from SDP. Packets are expected to be UDP payload only (so no Ethernet/IP/UDP headers). """ def __init__(self, packet: bytes): self.packet = packet # Only parse valid packets if self.marker not in 'SBX': raise ValueError("Invalid SDP statistics packet: packet marker (first byte) is '{}', not one of 'SBX'.".format(self.marker)) @property def marker(self) -> str: """ Return the type of statistic: 'S' = SST 'B' = BST 'X' = XST """ raw_marker = unpack("c",self.packet[0:1])[0] try: return raw_marker.decode('ascii') except UnicodeDecodeError: # non-ascii (>127) character, return as binary # # this is typically not visible to the user, as these packets are not SDP statistics packets, # which the constructor will refuse to accept. return raw_marker @property def version_id(self) -> int: """ Return the version of this packet. """ return unpack("B",self.packet[1:2])[0] @property def observation_id(self) -> int: """ Return the ID of the observation running when this packet was generated. """ return unpack("<I",self.packet[2:6])[0] @property def station_id(self) -> int: """ Return the number of the station this packet was generated on. """ return unpack("<H",self.packet[6:8])[0] @property def source_info(self) -> int: """ Return a dict with the source_info flags. The dict contains the following fields: _raw: raw value of the source_info field in the packet, as an integer. antenna_band_index: antenna type. 0 = low band, 1 = high band. nyquist_zone_index: nyquist zone of filter: 0 = 0 -- 1/2 * t_adc Hz (low band), 1 = 1/2 * t_adc -- t_adc Hz (high band), 2 = t_adc -- 3/2 * t_adc Hz (high band). t_adc: sampling clock. 0 = 160 MHz, 1 = 200 MHz. fsub_type: sampling method. 0 = critically sampled, 1 = oversampled. payload_error: 0 = data is ok, 1 = data is corrupted (a fault was encountered). beam_repositioning_flag: 0 = data is ok, 1 = beam got repositioned during packet construction (BST only). subband_calibrated_flag: 1 = subband data had subband calibration values applied, 0 = not reserved: reserved bits gn_index: global index of FPGA that emitted this packet. """ bits = unpack("<H",self.packet[8:10])[0] return { "_raw": bits, "antenna_band_index": get_bit_value(bits, 15), "nyquist_zone_index": get_bit_value(bits, 13, 14), "t_adc": get_bit_value(bits, 12), "fsub_type": get_bit_value(bits, 11), "payload_error": get_bit_value(bits, 10), "beam_repositioning_flag": get_bit_value(bits, 9), "subband_calibrated_flag": get_bit_value(bits, 8), "reserved": get_bit_value(bits, 5, 7), "gn_index": get_bit_value(bits, 0, 4), } @property def reserved(self) -> bytes: """ Reserved bytes. """ return self.packet[10:11] @property def integration_interval_raw(self) -> int: """ Returns the integration interval, in blocks. """ # This field is 3 bytes, little endian, so we need to append a 0 to parse it as a 32-bit integer. return unpack("<I", self.packet[11:14] + b'0')[0] def integration_interval(self) -> float: """ Returns the integration interval, in seconds. """ # Translate to seconds using the block period return self.integration_interval_raw * self.block_period() @property def data_id(self) -> int: """ Returns the generic data identifier. """ return unpack("<I",self.packet[14:18])[0] @property def nof_signal_inputs(self) -> int: """ Number of inputs that were used for constructing the payload. """ return unpack("<B",self.packet[18:19])[0] @property def nof_bytes_per_statistic(self) -> int: """ Word size for the payload. """ return unpack("<B",self.packet[19:20])[0] @property def nof_statistics_per_packet(self) -> int: """ Number of data points in the payload. """ return unpack("<H",self.packet[20:22])[0] @property def block_period_raw(self) -> int: """ Return the block period, in nanoseconds. """ return unpack("<H",self.packet[22:24])[0] def block_period(self) -> float: """ Return the block period, in seconds. """ return self.block_period_raw / 1e9 @property def block_serial_number(self) -> int: """ Block index since epoch (1970). """ return unpack("<Q",self.packet[24:32])[0] def timestamp(self) -> datetime: """ Returns the timestamp of the data in this packet. """ return datetime.fromtimestamp(self.block_serial_number * self.block_period(), timezone.utc) def header(self) -> dict: """ Return all the header fields as a dict. """ return { "marker": self.marker, "version_id": self.version_id, "observation_id": self.observation_id, "station_id": self.station_id, "source_info": self.source_info, "reserved": self.reserved, "integration_interval_raw": self.integration_interval_raw, "integration_interval": self.integration_interval(), "data_id": self.data_id, "nof_signal_inputs": self.nof_signal_inputs, "nof_bytes_per_statistic": self.nof_bytes_per_statistic, "nof_statistics_per_packet": self.nof_statistics_per_packet, "block_period_raw": self.block_period_raw, "block_period": self.block_period(), "block_serial_number": self.block_serial_number, "timestamp": self.timestamp(), } @property def payload_sst(self) -> numpy.array: """ The payload of this packet, interpreted as SST data. """ if self.marker != 'S': raise Exception("Payload of SST requested of a non-SST packet. Actual packet marker is '{}', but must be 'S'.".format(self.marker)) # derive which and how many elements to read from the packet header bytecount_to_unsigned_struct_type = { 1: 'B', 2: 'H', 4: 'I', 8: 'Q' } format_str = "<{}{}".format(self.nof_statistics_per_packet, bytecount_to_unsigned_struct_type[self.nof_bytes_per_statistic]) return numpy.array(unpack(format_str, self.packet[32:32+calcsize(format_str)])) if __name__ == "__main__": # parse one packet from stdin import sys import pprint # read all of stdin, even though we only parse the first packet. we're too lazy to intelligently decide when # the packet is complete and can stop reading. data = sys.stdin.buffer.read() packet = StatisticsPacket(data) # print header & payload pprint.pprint(packet.header()) pprint.pprint(packet.payload_sst)