Skip to content
Snippets Groups Projects
SDP_statistics.py 5.77 KiB
Newer Older
from datetime import datetime, timezone
def extract_bits(value, first, last=None):
Jan David Mol's avatar
Jan David Mol committed
    """ Return bits [first:last] from value. If last is not given, the value of bit 'first' is returned. Bit 0 = LSB. """

    # default last to first
    if last is None:
        last = first

    return value >> first & ((1 << (last - first + 1)) - 1)

class StatisticsPacket(object):
    """
       Models a statistics UDP packet from SDP.

       Packets are expected to be UDP payload only (so no Ethernet/IP/UDP headers).
    """

    def __init__(self, packet: bytes):
        self.packet = packet

    @property
    def marker(self) -> str:
        """ Return the type of statistic:
        
            'S' = SST
            'B' = BST
            'X' = XST
        """

        raw_marker = unpack("c",self.packet[0:1])[0]

        try:
            return raw_marker.decode('ascii') 
        except UnicodeDecodeError:
            # non-ascii (>127) character, return as binary
            return raw_marker

    @property
    def version_id(self) -> int:
        """ Return the version of this packet. """

        return unpack("B",self.packet[1:2])[0]

    @property
    def observation_id(self) -> int:
        """ Return the ID of the observation running when this packet was generated. """

        return unpack("<I",self.packet[2:6])[0]

    @property
    def station_id(self) -> int:
        """ Return the number of the station this packet was generated on. """

        return unpack("<H",self.packet[6:8])[0]

    @property
    def source_info(self) -> int:
        """ Return a dict with the source_info flags. """

        bits = unpack("<H",self.packet[8:10])[0]

        return {
          "_raw": bits,
          "antenna_band_index": extract_bits(bits, 15),
          "nyquist_zone_index": extract_bits(bits, 13, 14),
          "t_adc": extract_bits(bits, 12),
          "fsub_type": extract_bits(bits, 11),
          "payload_error": extract_bits(bits, 10),
          "beam_repositioning_flag": extract_bits(bits, 9),
          "subband_calibrated_flag": extract_bits(bits, 8),
        """ Reserved bytes. """

        return self.packet[10:11]

    @property
    def integration_interval_raw(self) -> int:
        """ Returns the integration interval, in blocks. """

        # This field is 3 bytes, little endian, so we need to append a 0 to parse it as a 32-bit integer.
        return unpack("<I", self.packet[11:14] + b'0')[0]

    def integration_interval(self) -> float:
        """ Returns the integration interval, in seconds. """

        # Translate to seconds using the block period
        return self.integration_interval_raw * self.block_period

    @property
    def data_id(self) -> int:
        return unpack("<I",self.packet[14:18])[0]

    @property
    def nof_signal_inputs(self) -> int:
        return unpack("<B",self.packet[18:19])[0]

    @property
    def nof_bytes_per_statistic(self) -> int:
        return unpack("<B",self.packet[19:20])[0]

    @property
    def nof_statistics_per_packet(self) -> int:
    @property
    def block_period_raw(self) -> int:
        """ Return the block period, in nanoseconds. """

        return unpack("<H",self.packet[22:24])[0]

    def block_period(self) -> float:
        """ Return the block period, in seconds. """

        return unpack("<Q",self.packet[24:32])[0]

    def timestamp(self) -> datetime:
        """ Returns the timestamp of the data in this packet. """

        return datetime.fromtimestamp(self.block_serial_number * self.block_period, timezone.utc)

    def header(self) -> dict:
        """ Return the header as a dict. """
        return {
          "marker": self.marker,
          "version_id": self.version_id,
          "observation_id": self.observation_id,
          "station_id": self.station_id,
          "source_info": self.source_info,
          "integration_interval_raw": self.integration_interval_raw,
          "integration_interval": self.integration_interval(),
          "data_id": self.data_id,
          "nof_signal_inputs": self.nof_signal_inputs,
          "nof_bytes_per_statistic": self.nof_bytes_per_statistic,
          "nof_statistics_per_packet": self.nof_statistics_per_packet,
          "block_period_raw": self.block_period_raw,
          "block_serial_number": self.block_serial_number,
        }

    @property
    def payload_sst(self):
        """ The payload of this packet, interpreted as SST data. """

        # derive which and how many elements to read from the packet header
        bytes_to_unsigned_struct_type = { 1: 'B', 2: 'H', 4: 'I', 8: 'Q' }
        format_str = "<{}{}".format(self.nof_statistics_per_packet, bytes_to_unsigned_struct_type[self.nof_bytes_per_statistic])

        return numpy.array(unpack(format_str, self.packet[32:32+calcsize(format_str)]))

if __name__ == "__main__":
    # parse one packet from stdin
    import sys
    import pprint

Jan David Mol's avatar
Jan David Mol committed
    # read all of stdin, even though we only parse the first packet. we're too lazy to intelligently decide when
    # the packet is complete and can stop reading.
    data = sys.stdin.buffer.read()
    packet = StatisticsPacket(data)
Jan David Mol's avatar
Jan David Mol committed

    # print header & payload