diff --git a/devices/clients/StatisticsPacket.py b/devices/clients/StatisticsPacket.py index 8f21128c242abec3aece814c827993dd5aaae5a1..ef3023a70011eb0cee89fca9a1b9d7778f5330de 100644 --- a/devices/clients/StatisticsPacket.py +++ b/devices/clients/StatisticsPacket.py @@ -1,9 +1,9 @@ -from struct import unpack, calcsize +import struct from datetime import datetime, timezone from typing import Tuple import numpy -__all__ = ["StatisticsPacket"] +__all__ = ["StatisticsPacket", "SSTPacket", "XSTPacket", "BSTPacket"] def get_bit_value(value: bytes, first_bit: int, last_bit:int=None) -> int: """ Return bits [first_bit:last_bit] from value, and return their integer value. Bit 0 = LSB. @@ -24,19 +24,111 @@ class StatisticsPacket(object): Models a statistics UDP packet from SDP. Packets are expected to be UDP payload only (so no Ethernet/IP/UDP headers). + + The following fields are exposed as properties & functions. The _raw fields come directly + from the packet, and have more user-friendly alternatives for intepretation: + + marker_raw packet marker as byte. + marker() packet marker as character. 'S' = SST, 'X' = XST, 'B' = BST + + version_id packet format version. + observation_id observation identifier. + station_id station identifier. + + source_info: bit field with input information, encoding several other properties. + antenna_band_index: antenna type. 0 = low band, 1 = high band. + nyquist_zone_index: nyquist zone of filter: + 0 = 0 -- 1/2 * t_adc Hz (low band), + 1 = 1/2 * t_adc -- t_adc Hz (high band), + 2 = t_adc -- 3/2 * t_adc Hz (high band). + t_adc: sampling clock. 0 = 160 MHz, 1 = 200 MHz. + fsub_type: sampling method. 0 = critically sampled, 1 = oversampled. + payload_error: 0 = data is ok, 1 = data is corrupted (a fault was encountered). + beam_repositioning_flag: 0 = data is ok, 1 = beam got repositioned during packet construction (BST only). + subband_calibrated_flag: 1 = subband data had subband calibration values applied, 0 = not. + gn_index: global index of FPGA that emitted this packet. + + data_id: bit field with payload information, encoding several other properties. + + nof_signal_inputs: number of inputs that contributed to data in this packet. + nof_bytes_per_statistics: word size of each statistic. + nof_statistics_per_packet: number of statistic data points in the payload. + + integration_interval_raw: integration interval, in block periods. + integration_interval(): integration interval, in seconds. + block_period_raw: block period, in nanoseconds. + block_period(): block period, in seconds. + block_serial_number: timestamp of the data, in block periods since 1970. + timestamp(): timestamp of the data, as a datetime object. + """ def __init__(self, packet: bytes): self.packet = packet + self.unpack() + # Only parse valid statistics packets from SDP, reject everything else - if type(self.marker) == bytes or self.marker not in 'SBX': - raise ValueError("Invalid SDP statistics packet: packet marker (first byte) is '{}', not one of 'SBX'.".format(self.marker)) + if self.marker_raw not in b'SBX': + raise ValueError("Invalid SDP statistics packet: packet marker (first byte) is {}, not one of 'SBX'.".format(self.marker)) + + def unpack(self): + """ Unpack the packet into properties of this object. """ + + # format string for the header, see unpack below + self.header_format = ">cBL HHB BHL BBH HQ" + self.header_size = struct.calcsize(self.header_format) + + # unpack fields + try: + (self.marker_raw, + self.version_id, + self.observation_id, + self.station_id, + self.source_info, + # reserved byte + _, + # integration interval, in block periods. This field is 3 bytes, big endian -- combine later + integration_interval_hi, + integration_interval_lo, + self.data_id, + self.nof_signal_inputs, + self.nof_bytes_per_statistic, + self.nof_statistics_per_packet, + self.block_period_raw, + self.block_serial_number) = struct.unpack(self.header_format, self.packet[:self.header_size]) + + self.integration_interval_raw = (integration_interval_hi << 16) + integration_interval_lo + except struct.error as e: + raise ValueError("Error parsing statistics packet") from e + + # unpack the fields we just updated + self.unpack_source_info() + self.unpack_data_id() + + def unpack_source_info(self): + """ Unpack the source_info field into properties of this object. """ + + self.antenna_band_index = get_bit_value(self.source_info, 15) + self.nyquist_zone_index = get_bit_value(self.source_info, 13, 14) + self.t_adc = get_bit_value(self.source_info, 12) + self.fsub_type = get_bit_value(self.source_info, 11) + self.payload_error = get_bit_value(self.source_info, 10) + self.beam_repositioning_flag = get_bit_value(self.source_info, 9) + self.subband_calibrated_flag = get_bit_value(self.source_info, 8) + # self.source_info 5-7 are reserved + self.gn_index = get_bit_value(self.source_info, 0, 4) + + def unpack_data_id(self): + """ Unpack the data_id field into properties of this object. """ + + # only useful in specialisations (XST/SST/BST) + pass def expected_size(self) -> int: """ The size this packet should be (header + payload), according to the header. """ - return 32 + self.nof_statistics_per_packet * self.nof_bytes_per_statistic + return self.header_size + self.nof_statistics_per_packet * self.nof_bytes_per_statistic @property def marker(self) -> str: @@ -47,84 +139,14 @@ class StatisticsPacket(object): 'X' = XST """ - raw_marker = unpack("c",self.packet[0:1])[0] - try: - return raw_marker.decode('ascii') + return self.marker_raw.decode('ascii') except UnicodeDecodeError: # non-ascii (>127) character, return as binary # # this is typically not visible to the user, as these packets are not SDP statistics packets, # which the constructor will refuse to accept. - return raw_marker - - @property - def version_id(self) -> int: - """ Return the version of this packet. """ - - return unpack("B",self.packet[1:2])[0] - - @property - def observation_id(self) -> int: - """ Return the ID of the observation running when this packet was generated. """ - - return unpack(">I",self.packet[2:6])[0] - - @property - def station_id(self) -> int: - """ Return the number of the station this packet was generated on. """ - - return unpack(">H",self.packet[6:8])[0] - - @property - def source_info(self) -> int: - """ Return a dict with the source_info flags. The dict contains the following fields: - - _raw: raw value of the source_info field in the packet, as an integer. - antenna_band_index: antenna type. 0 = low band, 1 = high band. - nyquist_zone_index: nyquist zone of filter: - 0 = 0 -- 1/2 * t_adc Hz (low band), - 1 = 1/2 * t_adc -- t_adc Hz (high band), - 2 = t_adc -- 3/2 * t_adc Hz (high band). - t_adc: sampling clock. 0 = 160 MHz, 1 = 200 MHz. - fsub_type: sampling method. 0 = critically sampled, 1 = oversampled. - payload_error: 0 = data is ok, 1 = data is corrupted (a fault was encountered). - beam_repositioning_flag: 0 = data is ok, 1 = beam got repositioned during packet construction (BST only). - subband_calibrated_flag: 1 = subband data had subband calibration values applied, 0 = not - reserved: reserved bits - gn_index: global index of FPGA that emitted this packet. """ - - bits = unpack(">H",self.packet[8:10])[0] - - return { - "_raw": bits, - "antenna_band_index": get_bit_value(bits, 15), - "nyquist_zone_index": get_bit_value(bits, 13, 14), - "t_adc": get_bit_value(bits, 12), - "fsub_type": get_bit_value(bits, 11), - "payload_error": get_bit_value(bits, 10), - "beam_repositioning_flag": get_bit_value(bits, 9), - "subband_calibrated_flag": get_bit_value(bits, 8), - "reserved": get_bit_value(bits, 5, 7), - "gn_index": get_bit_value(bits, 0, 4), - } - - @property - def payload_error(self) -> bool: - return self.source_info["payload_error"] == 1 - - @property - def reserved(self) -> bytes: - """ Reserved bytes. """ - - return self.packet[10:11] - - @property - def integration_interval_raw(self) -> int: - """ Returns the integration interval, in blocks. """ - - # This field is 3 bytes, big endian, so we need to prepend a 0 to parse it as a 32-bit integer. - return unpack(">I", b'0' + self.packet[11:14])[0] + return self.marker_raw def integration_interval(self) -> float: """ Returns the integration interval, in seconds. """ @@ -132,85 +154,11 @@ class StatisticsPacket(object): # Translate to seconds using the block period return self.integration_interval_raw * self.block_period() - @property - def data_id_raw(self) -> int: - """ Returns the generic data identifier. """ - - return unpack(">I",self.packet[14:18])[0] - - @property - def sst_signal_input_index(self) -> int: - """ SST: Returns the input (antenna) number for which this packet holds statistics. """ - - if self.marker != 'S': - raise Exception("Property of SST requested of a non-SST packet. Actual packet marker is '{}', but must be 'S'.".format(self.marker)) - - return get_bit_value(self.data_id_raw, 0, 7) - - @property - def bst_beamlet_index(self) -> int: - """ BST: Returns the number of the beamlet for which this packet holds statistics. """ - - if self.marker != 'B': - raise Exception("Property of BST requested of a non-BST packet. Actual packet marker is '{}', but must be 'B'.".format(self.marker)) - - return get_bit_value(self.data_id_raw, 0, 15) - - @property - def xst_subband_index(self) -> int: - """ XST: Returns the number of the subband for which this packet holds statistics. """ - - if self.marker != 'X': - raise Exception("Property of XST requested of a non-XST packet. Actual packet marker is '{}', but must be 'X'.".format(self.marker)) - - return get_bit_value(self.data_id_raw, 16, 24) - - @property - def xst_baseline(self) -> Tuple[int, int]: - """ XST: Returns the pair of inputs (antennae) for which this packet holds statistics. """ - - if self.marker != 'X': - raise Exception("Property of XST requested of a non-XST packet. Actual packet marker is '{}', but must be 'X'.".format(self.marker)) - - signal_input_a = get_bit_value(self.data_id_raw, 8, 15) - signal_input_b = get_bit_value(self.data_id_raw, 0, 7) - - return (signal_input_a, signal_input_b) - - @property - def nof_signal_inputs(self) -> int: - """ Number of inputs that were used for constructing the payload. """ - return unpack(">B",self.packet[18:19])[0] - - @property - def nof_bytes_per_statistic(self) -> int: - """ Word size for the payload. """ - - return unpack(">B",self.packet[19:20])[0] - - @property - def nof_statistics_per_packet(self) -> int: - """ Number of data points in the payload. """ - - return unpack(">H",self.packet[20:22])[0] - - @property - def block_period_raw(self) -> int: - """ Return the block period, in nanoseconds. """ - - return unpack(">H",self.packet[22:24])[0] - def block_period(self) -> float: """ Return the block period, in seconds. """ return self.block_period_raw / 1e9 - @property - def block_serial_number(self) -> int: - """ Block index since epoch (1970). """ - - return unpack(">Q",self.packet[24:32])[0] - def timestamp(self) -> datetime: """ Returns the timestamp of the data in this packet. @@ -231,11 +179,22 @@ class StatisticsPacket(object): "version_id": self.version_id, "observation_id": self.observation_id, "station_id": self.station_id, - "source_info": self.source_info, - "reserved": self.reserved, + "source_info": { + "_raw": self.source_info, + "antenna_band_index": self.antenna_band_index, + "nyquist_zone_index": self.nyquist_zone_index, + "t_adc": self.t_adc, + "fsub_type": self.fsub_type, + "payload_error": self.payload_error, + "beam_repositioning_flag": self.beam_repositioning_flag, + "subband_calibrated_flag": self.subband_calibrated_flag, + "gn_index": self.gn_index, + }, + "data_id": { + "_raw": self.data_id, + }, "integration_interval_raw": self.integration_interval_raw, "integration_interval": self.integration_interval(), - "data_id_raw": self.data_id_raw, "nof_signal_inputs": self.nof_signal_inputs, "nof_bytes_per_statistic": self.nof_bytes_per_statistic, "nof_statistics_per_packet": self.nof_statistics_per_packet, @@ -245,36 +204,108 @@ class StatisticsPacket(object): "timestamp": self.timestamp(), } - # add statistics-specific fields - if self.marker == 'S': - header.update({ - "sst_signal_input_index": self.sst_signal_input_index, - }) - elif self.marker == 'B': - header.update({ - "bst_beamlet_index": self.bst_beamlet_index, - }) - elif self.marker == 'X': - header.update({ - "xst_subband_index": self.xst_subband_index, - "xst_baseline": self.xst_baseline, - }) - return header - @property - def payload_sst(self) -> numpy.array: - """ The payload of this packet, interpreted as SST data. """ +class SSTPacket(StatisticsPacket): + """ + Models an SST statistics UDP packet from SDP. + + The following fields are exposed as properties & functions. + + signal_input_index: input (antenna polarisation) index for which this packet contains statistics + + payload[nof_statistics_per_packet]: SST statistics, an array of amplitudes per subband. + """ + + def __init__(self, packet): + super().__init__(packet) + + # We only parse SST packets if self.marker != 'S': raise Exception("Payload of SST requested of a non-SST packet. Actual packet marker is '{}', but must be 'S'.".format(self.marker)) + def unpack_data_id(self): + super().unpack_data_id() + + self.signal_input_index = get_bit_value(self.data_id, 0, 7) + + def header(self): + header = super().header() + + header["data_id"]["signal_input_index"] = self.signal_input_index + + return header + + @property + def payload(self) -> numpy.array: + """ The payload of this packet, interpreted as SST data. """ + # derive which and how many elements to read from the packet header bytecount_to_unsigned_struct_type = { 1: 'B', 2: 'H', 4: 'I', 8: 'Q' } format_str = ">{}{}".format(self.nof_statistics_per_packet, bytecount_to_unsigned_struct_type[self.nof_bytes_per_statistic]) - return numpy.array(unpack(format_str, self.packet[32:32+calcsize(format_str)])) + return numpy.array(struct.unpack(format_str, self.packet[self.header_size:self.header_size + struct.calcsize(format_str)])) +class XSTPacket(StatisticsPacket): + """ + Models an XST statistics UDP packet from SDP. + + The following fields are exposed as properties & functions. + + + subband_index: subband number for which this packet contains statistics. + baseline: antenna pair for which this packet contains statistics. + """ + + def __init__(self, packet): + super().__init__(packet) + + # We only parse XST packets + if self.marker != 'X': + raise Exception("Payload of XST requested of a non-XST packet. Actual packet marker is '{}', but must be 'X'.".format(self.marker)) + + def unpack_data_id(self): + super().unpack_data_id() + + self.subband_index = get_bit_value(self.data_id, 16, 24) + self.baseline = (get_bit_value(self.data_id, 8, 15), get_bit_value(self.data_id, 0, 7)) + + def header(self): + header = super().header() + + header["data_id"]["subband_index"] = self.subband_index + header["data_id"]["baseline"] = self.baseline + + return header + +class BSTPacket(StatisticsPacket): + """ + Models an BST statistics UDP packet from SDP. + + The following fields are exposed as properties & functions. + + bst_beamlet_index: the number of the beamlet for which this packet holds statistics. + """ + + def __init__(self, packet): + super().__init__(packet) + + # We only parse BST packets + if self.marker != 'B': + raise Exception("Payload of BST requested of a non-BST packet. Actual packet marker is '{}', but must be 'B'.".format(self.marker)) + + def unpack_data_id(self): + super().unpack_data_id() + + self.bst_beamlet_index = get_bit_value(self.data_id, 0, 15) + + def header(self): + header = super().header() + + header["data_id"]["bst_beamlet_index"] = self.bst_beamlet_index + + return header if __name__ == "__main__": # parse one packet from stdin @@ -284,9 +315,9 @@ if __name__ == "__main__": # read all of stdin, even though we only parse the first packet. we're too lazy to intelligently decide when # the packet is complete and can stop reading. data = sys.stdin.buffer.read() - packet = StatisticsPacket(data) + packet = SSTPacket(data) # print header & payload pprint.pprint(packet.header()) - pprint.pprint(packet.payload_sst) + pprint.pprint(packet.payload) diff --git a/devices/clients/sst_client.py b/devices/clients/sst_client.py index 4ebf4a9aa76e4b626390f4c85dc0ad04a88ed34b..5a6153fc0f38df3ab837e2881d588c65382462b1 100644 --- a/devices/clients/sst_client.py +++ b/devices/clients/sst_client.py @@ -2,7 +2,7 @@ import queue from threading import Thread import socket from util.comms_client import CommClient -from clients.StatisticsPacket import StatisticsPacket +from clients.StatisticsPacket import SSTPacket from queue import Queue @@ -224,19 +224,14 @@ class SST_collector(Thread): try: try: - fields = StatisticsPacket(packet) + fields = SSTPacket(packet) except ValueError: - # could not parse header - self.parameters["nof_invalid_packets"] += numpy.uint64(1) - return - - if fields.marker != 'S': - # packet is not SST + # not an SST packet self.parameters["nof_invalid_packets"] += numpy.uint64(1) return # which input this packet contains data for - input_index = fields.sst_signal_input_index + input_index = fields.signal_input_index if input_index >= self.MAX_INPUTS: # packet describes an input that is out of bounds for us @@ -250,7 +245,7 @@ class SST_collector(Thread): # process the packet self.parameters["nof_valid_payloads"][input_index] += numpy.uint64(1) - self.parameters["sst_values"][input_index][:fields.nof_statistics_per_packet] = fields.payload_sst + self.parameters["sst_values"][input_index][:fields.nof_statistics_per_packet] = fields.payload self.parameters["sst_timestamps"][input_index] = numpy.float64(fields.timestamp().timestamp()) self.parameters["integration_intervals"][input_index] = fields.integration_interval() except Exception as e: