Skip to content
Snippets Groups Projects
Select Git revision
  • 81b381ca67b35e7f1fb4412a55da615e8a3447a2
  • master default protected
  • L2SDP-LIFT
  • L2SDP-1113
  • HPR-158
5 results

readme.md

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    collector.py 21.14 KiB
    #  Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy)
    #  SPDX-License-Identifier: Apache-2.0
    
    """Collectors for all types of statistics
    
    This includes:
        XSTCollector
        BSTCollector
        SSTCollector
    """
    
    # logging use & lazy string, (too-few-public-methods)
    # pylint: disable=W1203, R0903
    
    import abc
    import logging
    
    import numpy
    
    from lofar_station_client.math.baseline import baseline_from_index
    from lofar_station_client.math.baseline import baseline_index
    
    # TODO(Corne): Discuss moving to lofar_common_python library?
    from lofar_station_client.math.baseline import nr_baselines
    from lofar_station_client.statistics.packet import BSTPacket
    from lofar_station_client.statistics.packet import SSTPacket
    from lofar_station_client.statistics.packet import XSTPacket
    
    from lofar_station_client.dts.constants import N_pol
    
    logger = logging.getLogger()
    
    
    class StatisticsCollector(abc.ABC):
        """Base class to process statistics packets into parameters matrices."""
    
        # Maximum number of FPGAs we receive data from (used for diagnostics)
        MAX_FPGAS = 16
    
        def __init__(self):
            self.parameters = self._default_parameters()
    
        def _default_parameters(self):
            """Default parameters that all types of statistics contain"""
    
            return {
                "nof_packets": numpy.uint64(0),
                # Packet count for packets that could not be parsed
                "nof_invalid_packets": numpy.uint64(0),
                # Full contents of the latest packet we deemed invalid.
                "last_invalid_packet": numpy.zeros((9000,), dtype=numpy.uint8),
                # Track the last invalid packet
                "last_invalid_packet_exception": "None",
                # gn_index of FPGAs as reported in the packets, for verification purposes
                "gn_indices": numpy.full((self.MAX_FPGAS,), -1, dtype=numpy.int16),
            }
    
        def gn_index_to_fpga_nr(self, gn_index: int) -> int:
            """Register an FPGA to the list of encountered gn_indices.
    
            The FPGAs can carry any gn_index 0..255 but we only maintain
            statistics for MAX_FPGAS of them. We therefor maintain a mapping
            of which gn_indices we have encountered.
    
            Returns the index of the registered gn_index within the list.
    
            Throws a ValueError if more than MAX_FPGAS are encountered."""
    
            def find_fpga_nr(gn_index):
                indices = numpy.where(self.parameters["gn_indices"] == gn_index)[0]
                if len(indices):
                    return indices[0]
    
                raise ValueError(f"Could not find gn_index {gn_index}")
    
            try:
                return find_fpga_nr(gn_index)
            except ValueError:
                # FPGA not yet in the list
                try:
                    new_fpga_nr = find_fpga_nr(-1)
                except ValueError as ex:
                    # No room anymore! Throw exception
                    raise ValueError(
                        f"Cannot register data coming from FPGA {gn_index}, as "
                        f"the administration is filled with data from FPGAs "
                        f"{self.parameters['gn_indices']}"
                    ) from ex
    
                # Add this FPGA to the list
                self.parameters["gn_indices"][new_fpga_nr] = gn_index
                return new_fpga_nr
    
        def process_packet(self, packet):
            """Baseclass wrapper around performing parse_packet"""
    
            self.parameters["nof_packets"] += numpy.uint64(1)
    
            try:
                self._parse_packet(packet)
            except Exception as err:
                self.parameters["last_invalid_packet"] = numpy.frombuffer(
                    packet, dtype=numpy.uint8
                )
                self.parameters["nof_invalid_packets"] += numpy.uint64(1)
    
                self.parameters["last_invalid_packet_exception"] = str(err)
    
                raise ValueError("Could not parse statistics packet") from err
    
        @abc.abstractmethod
        def _parse_packet(self, packet):
            """Update any information based on this packet."""
    
            raise NotImplementedError
    
    
    class SSTCollector(StatisticsCollector):
        """Class to process SST statistics packets."""
    
        # Maximum number of antenna inputs we support (used to determine array sizes)
        MAX_INPUTS = 192
    
        # Maximum number of subbands we support (used to determine array sizes)
        MAX_SUBBANDS = 512
    
        def __init__(
            self, nr_signal_inputs: int = MAX_INPUTS, first_signal_input_index: int = 0
        ):
            self.nr_signal_inputs = nr_signal_inputs
            self.first_signal_input_index = first_signal_input_index
    
            super().__init__()
    
        def _default_parameters(self):
            defaults = super()._default_parameters()
    
            defaults.update(
                {
                    # Number of packets received so far that we could parse correctly and do
                    # not have a payload error
                    "nof_valid_payloads": numpy.zeros(
                        (self.MAX_FPGAS,), dtype=numpy.uint64
                    ),
                    # Packets that reported a payload error
                    "nof_payload_errors": numpy.zeros(
                        (self.MAX_FPGAS,), dtype=numpy.uint64
                    ),
                    # Last value array we've constructed out of the packets
                    "sst_values": numpy.zeros(
                        (self.nr_signal_inputs, self.MAX_SUBBANDS), dtype=numpy.uint64
                    ),
                    "sst_timestamps": numpy.zeros(
                        (self.nr_signal_inputs,), dtype=numpy.float64
                    ),
                    "integration_intervals": numpy.zeros(
                        (self.nr_signal_inputs,), dtype=numpy.float32
                    ),
                    "subbands_calibrated": numpy.zeros(
                        (self.nr_signal_inputs,), dtype=bool
                    ),
                }
            )
    
            return defaults
    
        def _parse_packet(self, packet):
            fields = SSTPacket(packet)
    
            fpga_nr = self.gn_index_to_fpga_nr(fields.gn_index)
    
            input_index = fields.signal_input_index - self.first_signal_input_index
    
            # determine which input this packet contains data for
            if not 0 <= input_index < self.nr_signal_inputs:
                # packet describes an input that is out of bounds for us
                raise ValueError(
                    f"Packet describes input {fields.signal_input_index}, but we are "
                    f"limited to describing {self.nr_signal_inputs} starting at index "
                    f"{self.first_signal_input_index}"
                )
    
            if fields.payload_error:
                # cannot trust the data if a payload error is reported
                self.parameters["nof_payload_errors"][fpga_nr] += numpy.uint64(1)
    
                # don't raise, as packet is valid
                return
    
            # process the packet
            self.parameters["nof_valid_payloads"][fpga_nr] += numpy.uint64(1)
            self.parameters["sst_values"][input_index][
                : fields.nof_statistics_per_packet
            ] = fields.payload()
            self.parameters["sst_timestamps"][input_index] = numpy.float64(
                fields.timestamp().timestamp()
            )
            self.parameters["integration_intervals"][
                input_index
            ] = fields.integration_interval()
            self.parameters["subbands_calibrated"][
                input_index
            ] = fields.subband_calibrated_flag
    
    
    class XSTCollector(StatisticsCollector):
        """Class to process XST statistics packets.
    
        XSTs are received for up to MAX_PARALLEL_SUBBANDS simultaneously, and only the
        values of the last MAX_PARALLEL_SUBBANDS are kept. Raw data are collected for
        each subband in parameters["xst_blocks"], and overwritten if newer (younger)
        data is received for the same subband. As such, the data represent a rolling
        view on the XSTs.
    
        The xst_values() function is a user-friendly way to read the xst_blocks.
    
        The hardware can be configured to emit different and/or fewer subbands, causing
        some of the XSTs to become stale. It is therefor advised to inspect
        parameters["xst_timestamps"] as well.
        """
    
        # Maximum number of subbands for which we collect XSTs simultaneously
        MAX_PARALLEL_SUBBANDS = 8
    
        # Maximum number of antenna inputs we support (used to determine array sizes)
        MAX_INPUTS = 192
    
        # Maximum number of baselines we can receive
        MAX_BASELINES = nr_baselines(MAX_INPUTS)
    
        # Expected block size is BLOCK_LENGTH x BLOCK_LENGTH
        BLOCK_LENGTH = 12
    
        # Expected number of blocks: enough to cover all baselines without the conjugates
        # (that is, the top-left triangle of the matrix).
        MAX_BLOCKS = nr_baselines(MAX_INPUTS // BLOCK_LENGTH)
    
        # Maximum number of subbands we support (used to determine array sizes)
        MAX_SUBBANDS = 512
    
        # Complex values are (real, imag). A bit silly, but we don't want magical
        # constants.
        VALUES_PER_COMPLEX = 2
    
        def __init__(
            self,
            nr_signal_inputs: int = MAX_INPUTS,
            first_signal_input_index: int = 0,
            nr_parallel_subbands: int = MAX_PARALLEL_SUBBANDS,
        ):
            if nr_signal_inputs % self.BLOCK_LENGTH != 0:
                # This restriction could be lifted if we slice sub blocks out
                # of the XST packets.
                raise ValueError(
                    f"Number of signal inputs must be a multiple of {self.BLOCK_LENGTH}, "
                    f"but is {nr_signal_inputs}"
                )
    
            self.nr_signal_inputs = nr_signal_inputs
            self.first_signal_input_index = first_signal_input_index
            self.nr_parallel_subbands = nr_parallel_subbands
    
            super().__init__()
    
        @property
        def nr_blocks(self):
            """Number of blocks that contain XSTs for our signal inputs."""
            return nr_baselines(self.nr_signal_inputs // self.BLOCK_LENGTH)
    
        def _default_parameters(self):
            defaults = super()._default_parameters()
    
            defaults.update(
                {
                    # Number of packets received so far that we could parse correctly
                    # and do not have a payload error
                    "nof_valid_payloads": numpy.zeros(
                        (self.MAX_FPGAS,), dtype=numpy.uint64
                    ),
                    # Packets that reported a payload error
                    "nof_payload_errors": numpy.zeros(
                        (self.MAX_FPGAS,), dtype=numpy.uint64
                    ),
                    # Last value array we've constructed out of the packets
                    "xst_blocks": numpy.zeros(
                        (
                            self.nr_parallel_subbands,
                            self.nr_blocks,
                            self.BLOCK_LENGTH * self.BLOCK_LENGTH * self.VALUES_PER_COMPLEX,
                        ),
                        dtype=numpy.int64,
                    ),
                    # Whether the values are actually conjugated and transposed
                    "xst_conjugated": numpy.zeros(
                        (
                            self.nr_parallel_subbands,
                            self.nr_blocks,
                        ),
                        dtype=bool,
                    ),
                    # When the youngest data for each subband was received
                    "xst_timestamps": numpy.zeros(
                        (self.nr_parallel_subbands,), dtype=numpy.float64
                    ),
                    "xst_subbands": numpy.zeros(
                        (self.nr_parallel_subbands,), dtype=numpy.uint16
                    ),
                    "xst_integration_intervals": numpy.zeros(
                        (self.nr_parallel_subbands,), dtype=numpy.float32
                    ),
                }
            )
    
            return defaults
    
        def select_subband_slot(self, subband):
            """Return which subband slot (0..nr_parallel_subbands) to use
    
            Selects which frontend to use if it is a new subband.
            Keep recording the same subband if we're already tracking it, but
            allocate or replace a slot if not.
            """
    
            indices = numpy.where(self.parameters["xst_subbands"] == subband)[0]
    
            if len(indices) > 0:
                # subband already being recorded, use same spot
                return indices[0]
    
            # a new subband, kick out the oldest
            oldest_timestamp = self.parameters["xst_timestamps"].min()
    
            # prefer the first one in case of multiple minima
            return numpy.where(self.parameters["xst_timestamps"] == oldest_timestamp)[0][0]
    
        def _parse_packet(self, packet):
            fields = XSTPacket(packet)
    
            fpga_nr = self.gn_index_to_fpga_nr(fields.gn_index)
    
            if fields.payload_error:
                # cannot trust the data if a payload error is reported
                self.parameters["nof_payload_errors"][fpga_nr] += numpy.uint64(1)
    
                # don't raise, as packet is valid
                return
    
            # the blocks must be of size BLOCK_LENGTH x BLOCK_LENGTH
            if fields.nof_signal_inputs != self.BLOCK_LENGTH:
                raise ValueError(
                    f"Packet describes a block of {fields.nof_signal_inputs} x "
                    f"{fields.nof_signal_inputs} baselines, but we can only parse blocks"
                    f"of {self.BLOCK_LENGTH} x {self.BLOCK_LENGTH} baselines"
                )
    
            # check whether set of baselines in this packet are not out of bounds
            for antenna in (0, 1):
                if not (
                    0
                    <= fields.first_baseline[antenna]
                    + fields.nof_signal_inputs
                    - self.first_signal_input_index
                    <= self.nr_signal_inputs
                ):
                    # packet describes an input that is out of bounds for us
                    raise ValueError(
                        f"Packet describes {fields.nof_signal_inputs} x "
                        f"{fields.nof_signal_inputs} baselines starting at "
                        f"{fields.first_baseline}, but we are limited to describing "
                        f"{self.nr_signal_inputs} starting at offset "
                        f"{self.first_signal_input_index}"
                    )
    
                # the blocks of baselines need to be tightly packed, and thus be provided
                # at exact intervals
                if fields.first_baseline[antenna] % self.BLOCK_LENGTH != 0:
                    raise ValueError(
                        f"Packet describes baselines starting at {fields.first_baseline}, "
                        f"but we require a multiple of BLOCK_LENGTH={self.BLOCK_LENGTH}"
                    )
    
            # Make sure we always have a baseline (a,b) with a>=b. If not, we swap the
            # indices and mark that the data must be conjugated and transposed when
            # processed.
            first_baseline = fields.first_baseline
            if first_baseline[0] < first_baseline[1]:
                conjugated = True
                first_baseline = (first_baseline[1], first_baseline[0])
            else:
                conjugated = False
    
            # Adjust for our offset
            first_baseline = (
                first_baseline[0] - self.first_signal_input_index,
                first_baseline[1] - self.first_signal_input_index,
            )
    
            # we keep track of multiple subbands. select slot for this one
            subband_slot = self.select_subband_slot(fields.subband_index)
    
            assert 0 <= subband_slot < self.nr_parallel_subbands, (
                f"Selected slot{subband_slot}, but only have room for "
                f"{self.nr_parallel_subbands}. Existing slots are "
                f"{self.parameters['xst_subbands']}, processing subband "
                f"{fields.subband_index}."
            )
    
            # the payload contains complex values for the block of baselines of size
            # BLOCK_LENGTH x BLOCK_LENGTH starting at baseline first_baseline.
            #
            # we honour this format, as we want to keep the metadata together with these
            # blocks. we do need to put the blocks in a linear and tight order, however,
            # so we calculate a block index.
            block_index = baseline_index(
                first_baseline[0] // self.BLOCK_LENGTH,
                first_baseline[1] // self.BLOCK_LENGTH,
            )
    
            # We did enough checks on first_baseline for this to be a logic error in our
            # code
            assert 0 <= block_index < self.nr_blocks, (
                f"Received block {block_index}, but have only room for {self.nr_blocks}"
                f". Block starts at baseline {first_baseline}."
            )
    
            # process the packet
            self.parameters["nof_valid_payloads"][fpga_nr] += numpy.uint64(1)
    
            self.parameters["xst_blocks"][
                subband_slot, block_index, : fields.nof_statistics_per_packet
            ] = fields.payload()
            self.parameters["xst_timestamps"][subband_slot] = numpy.float64(
                fields.timestamp().timestamp()
            )
            self.parameters["xst_conjugated"][subband_slot, block_index] = conjugated
            self.parameters["xst_subbands"][subband_slot] = numpy.uint16(
                fields.subband_index
            )
            self.parameters["xst_integration_intervals"][
                subband_slot
            ] = fields.integration_interval()
    
        def xst_values(self, subband_indices=None):
            """xst_blocks, but as a matrix[len(subband_indices)][MAX_INPUTS][MAX_INPUTS]
    
            The subband indices must be in [0..nr_parallel_subbands). By default, all
            recorded XSTs are returned.
            """
    
            if subband_indices is None:
                subband_indices = range(self.nr_parallel_subbands)
    
            matrix = numpy.zeros(
                (len(subband_indices), self.nr_signal_inputs, self.nr_signal_inputs),
                dtype=numpy.complex64,
            )
            xst_blocks = self.parameters["xst_blocks"]
            xst_conjugated = self.parameters["xst_conjugated"]
    
            for matrix_idx, subband_index in enumerate(subband_indices):
                for block_index in range(self.nr_blocks):
                    # convert real/imag int to complex float values. this works as
                    # real/imag come in pairs
                    block = (
                        xst_blocks[subband_index][block_index]
                        .astype(numpy.float32)
                        .view(numpy.complex64)
                    )
    
                    # reshape into [a][b]
                    block = block.reshape(self.BLOCK_LENGTH, self.BLOCK_LENGTH)
    
                    if xst_conjugated[subband_index][block_index]:
                        # block is conjugated and transposed. process.
                        block = block.conjugate().transpose()
    
                    # compute destination in matrix
                    first_baseline = baseline_from_index(block_index)
                    first_baseline = (
                        first_baseline[0] * self.BLOCK_LENGTH,
                        first_baseline[1] * self.BLOCK_LENGTH,
                    )
    
                    # copy block into matrix
                    matrix[matrix_idx][
                        first_baseline[0] : first_baseline[0] + self.BLOCK_LENGTH,
                        first_baseline[1] : first_baseline[1] + self.BLOCK_LENGTH,
                    ] = block
    
            return matrix
    
    
    class BSTCollector(StatisticsCollector):
        """Class to process SST statistics packets."""
    
        # beamlets = 488 * 2 for the x and y polarisations
        MAX_BEAMLETS = 488
    
        def _default_parameters(self):
            defaults = super()._default_parameters()
    
            defaults.update(
                {
                    # Number of packets received so far that we could parse correctly and
                    # do not have a payload error
                    "nof_valid_payloads": numpy.zeros(
                        (self.MAX_FPGAS,),
                        dtype=numpy.uint64,
                    ),
                    # Packets that reported a payload error
                    "nof_payload_errors": numpy.zeros(
                        (self.MAX_FPGAS,),
                        dtype=numpy.uint64,
                    ),
                    # Last value array we've constructed out of the packets
                    "bst_values": numpy.zeros(
                        (self.MAX_BEAMLETS, N_pol),
                        dtype=numpy.uint64,
                    ),
                    "bst_timestamps": numpy.zeros(
                        (self.MAX_FPGAS,),
                        dtype=numpy.float64,
                    ),
                    "integration_intervals": numpy.zeros(
                        (self.MAX_FPGAS,),
                        dtype=numpy.float32,
                    ),
                }
            )
    
            return defaults
    
        def _parse_packet(self, packet):
            fields = BSTPacket(packet)
    
            fpga_nr = self.gn_index_to_fpga_nr(fields.gn_index)
    
            # number of beamlets in the packet
            beamlets = fields.payload()
            nr_beamlets = beamlets.shape[0]
            first_beamlet = fields.beamlet_index
            last_beamlet = first_beamlet + nr_beamlets
    
            # determine which input this packet contains data for
            if last_beamlet > self.MAX_BEAMLETS:
                # packet describes an input that is out of bounds for us
                raise ValueError(
                    f"Packet describes {nr_beamlets} beamlets starting at "
                    f"{fields.beamlet_index}, but we are limited "
                    f"to describing MAX_BEAMLETS={self.MAX_BEAMLETS}"
                )
    
            if fields.payload_error:
                # cannot trust the data if a payload error is reported
                self.parameters["nof_payload_errors"][fpga_nr] += numpy.uint64(1)
    
                # don't raise, as packet is valid
                return
    
            # process the packet
            self.parameters["nof_valid_payloads"][fpga_nr] += numpy.uint64(1)
    
            self.parameters["bst_values"][first_beamlet:last_beamlet] = beamlets
            self.parameters["bst_timestamps"][fpga_nr] = numpy.float64(
                fields.timestamp().timestamp()
            )
            self.parameters["integration_intervals"][
                fpga_nr
            ] = fields.integration_interval()