Skip to content
Snippets Groups Projects
SDP_statistics.py 7.92 KiB
Newer Older
from datetime import datetime, timezone
def get_bit_value(value: bytes, first_bit: int, last_bit:int=None) -> int:
    """ Return bits [first_bit:last_bit] from value, and return their integer value. Bit 0 = LSB.

        For example, extracting bits 2-3 from b'01100' returns 11 binary = 3 decimal:
            get_bit_value(b'01100', 2, 3) == 3
        If 'last_bit' is not given, just the value of bit 'first_bit' is returned. """
    # default last_bit to first_bit
    if last_bit is None:
        last_bit = first_bit
    return value >> first_bit & ((1 << (last_bit - first_bit + 1)) - 1)

class StatisticsPacket(object):
    """
       Models a statistics UDP packet from SDP.

       Packets are expected to be UDP payload only (so no Ethernet/IP/UDP headers).
    """

    def __init__(self, packet: bytes):
        self.packet = packet

        # Only parse valid packets
        if self.marker not in 'SBX':
            raise ValueError("Invalid SDP statistics packet: packet marker (first byte) is '{}', not one of 'SBX'.".format(self.marker))

    @property
    def marker(self) -> str:
        """ Return the type of statistic:
        
            'S' = SST
            'B' = BST
            'X' = XST
        """

        raw_marker = unpack("c",self.packet[0:1])[0]

        try:
            return raw_marker.decode('ascii')
        except UnicodeDecodeError:
            # non-ascii (>127) character, return as binary
            #
            # this is typically not visible to the user, as these packets are not SDP statistics packets,
            # which the constructor will refuse to accept.
            return raw_marker

    @property
    def version_id(self) -> int:
        """ Return the version of this packet. """

        return unpack("B",self.packet[1:2])[0]

    @property
    def observation_id(self) -> int:
        """ Return the ID of the observation running when this packet was generated. """

        return unpack("<I",self.packet[2:6])[0]

    @property
    def station_id(self) -> int:
        """ Return the number of the station this packet was generated on. """

        return unpack("<H",self.packet[6:8])[0]

    @property
    def source_info(self) -> int:
        """ Return a dict with the source_info flags. The dict contains the following fields:
        
            _raw:                    raw value of the source_info field in the packet, as an integer.
            antenna_band_index:      antenna type. 0 = low band, 1 = high band.
            nyquist_zone_index:      nyquist zone of filter:
                                         0 =           0 -- 1/2 * t_adc Hz (low band),
                                         1 = 1/2 * t_adc -- t_adc Hz       (high band),
                                         2 =       t_adc -- 3/2 * t_adc Hz (high band).
            t_adc:                   sampling clock. 0 = 160 MHz, 1 = 200 MHz.
            fsub_type:               sampling method. 0 = critically sampled, 1 = oversampled.
            payload_error:           0 = data is ok, 1 = data is corrupted (a fault was encountered).
            beam_repositioning_flag: 0 = data is ok, 1 = beam got repositioned during packet construction (BST only).
            subband_calibrated_flag: 1 = subband data had subband calibration values applied, 0 = not
            reserved:                reserved bits
            gn_index:                global index of FPGA that emitted this packet. """

        bits = unpack("<H",self.packet[8:10])[0]

        return {
          "_raw": bits,
          "antenna_band_index": get_bit_value(bits, 15),
          "nyquist_zone_index": get_bit_value(bits, 13, 14),
          "t_adc": get_bit_value(bits, 12),
          "fsub_type": get_bit_value(bits, 11),
          "payload_error": get_bit_value(bits, 10),
          "beam_repositioning_flag": get_bit_value(bits, 9),
          "subband_calibrated_flag": get_bit_value(bits, 8),
          "reserved": get_bit_value(bits, 5, 7),
          "gn_index": get_bit_value(bits, 0, 4),
        """ Reserved bytes. """

        return self.packet[10:11]

    @property
    def integration_interval_raw(self) -> int:
        """ Returns the integration interval, in blocks. """

        # This field is 3 bytes, little endian, so we need to append a 0 to parse it as a 32-bit integer.
        return unpack("<I", self.packet[11:14] + b'0')[0]

    def integration_interval(self) -> float:
        """ Returns the integration interval, in seconds. """

        # Translate to seconds using the block period
        return self.integration_interval_raw * self.block_period()
        """ Returns the generic data identifier. """

        return unpack("<I",self.packet[14:18])[0]

    @property
    def nof_signal_inputs(self) -> int:
        """ Number of inputs that were used for constructing the payload. """
        return unpack("<B",self.packet[18:19])[0]

    @property
    def nof_bytes_per_statistic(self) -> int:
        return unpack("<B",self.packet[19:20])[0]

    @property
    def nof_statistics_per_packet(self) -> int:
    @property
    def block_period_raw(self) -> int:
        """ Return the block period, in nanoseconds. """

        return unpack("<H",self.packet[22:24])[0]

    def block_period(self) -> float:
        """ Return the block period, in seconds. """

        return unpack("<Q",self.packet[24:32])[0]

    def timestamp(self) -> datetime:
        """ Returns the timestamp of the data in this packet. """

        return datetime.fromtimestamp(self.block_serial_number * self.block_period(), timezone.utc)
        """ Return all the header fields as a dict. """
        return {
          "marker": self.marker,
          "version_id": self.version_id,
          "observation_id": self.observation_id,
          "station_id": self.station_id,
          "source_info": self.source_info,
          "integration_interval_raw": self.integration_interval_raw,
          "integration_interval": self.integration_interval(),
          "data_id": self.data_id,
          "nof_signal_inputs": self.nof_signal_inputs,
          "nof_bytes_per_statistic": self.nof_bytes_per_statistic,
          "nof_statistics_per_packet": self.nof_statistics_per_packet,
          "block_period_raw": self.block_period_raw,
          "block_serial_number": self.block_serial_number,
    def payload_sst(self) -> numpy.array:
        """ The payload of this packet, interpreted as SST data. """

        if self.marker != 'S':
            raise Exception("Payload of SST requested of a non-SST packet. Actual packet marker is '{}', but must be 'S'.".format(self.marker))

        # derive which and how many elements to read from the packet header
        bytecount_to_unsigned_struct_type = { 1: 'B', 2: 'H', 4: 'I', 8: 'Q' }
        format_str = "<{}{}".format(self.nof_statistics_per_packet, bytecount_to_unsigned_struct_type[self.nof_bytes_per_statistic])
        return numpy.array(unpack(format_str, self.packet[32:32+calcsize(format_str)]))

if __name__ == "__main__":
    # parse one packet from stdin
    import sys
    import pprint

Jan David Mol's avatar
Jan David Mol committed
    # read all of stdin, even though we only parse the first packet. we're too lazy to intelligently decide when
    # the packet is complete and can stop reading.
    data = sys.stdin.buffer.read()
    packet = StatisticsPacket(data)
Jan David Mol's avatar
Jan David Mol committed

    # print header & payload