Skip to content
Snippets Groups Projects
multirate.py 38.9 KiB
Newer Older
#! /usr/bin/env python3
###############################################################################
#
# Copyright 2024
# ASTRON (Netherlands Institute for Radio Astronomy) <http://www.astron.nl/>
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

###############################################################################

# Author: Eric Kooistra
# Purpose: Multirate functions for DSP
# Description:
#
# Usage:
# . Use [2] to verify this code.
#
# References:
# [1] dsp_study_erko.txt
# [2] http://localhost:8888/notebooks/pfb_os/up_downsampling.ipynb
#
# Books:
# . HARRIS 6 title Figure
# . LYONS
# . CROCHIERE
from sys import exit
from scipy import signal
from .utilities import c_rtol, c_atol, ceil_div

Eric Kooistra's avatar
Eric Kooistra committed
c_firA = [1.0]  # FIR b = coefs, a = 1

def zeros(shape, cmplx=False):
    if cmplx:
        return np.zeros(shape, dtype='cfloat')
    else:
        return np.zeros(shape)


def ones(shape, cmplx=False):
    return zeros(shape, cmplx) + 1


def down(x, D, phase=0):
    """Downsample x[n] by factor D, xD[m] = x[m D], m = n // D

    Keep every D-th sample in x and skip every D-1 samples after that sample.
    """
    xD = x[phase::D]
    return xD


def up(x, U):
    """Upsample x by factor U, xU[m] = x[n] when m = n U, else 0.

    After every sample in x insert U-1 zeros.
    """
    xU = zeros(len(x) * U, np.iscomplexobj(x))
    xU[0::U] = x
    return xU


###############################################################################
Eric Kooistra's avatar
Eric Kooistra committed
# Polyphase filter structure for input block processing
###############################################################################

class PolyPhaseFirFilterStructure:
    """Polyphase FIR filter structure (PFS) per block of data

Eric Kooistra's avatar
Eric Kooistra committed
    Purpose of PFS implementation is to avoid multiply by zero values in case
    of upsampling and to avoid calculating samples that will be discarded in
    case of downsampling. The output result of the PFS FIR is identical to
    using the FIR filter. The spectral aliasing and spectral replication are
    due to the sample rate change, not to the implementation structure.
    The FIR data can be real to process real data, or complex to process
    equivalent baseband data. The FIR coefficients are real.

Eric Kooistra's avatar
Eric Kooistra committed
    The PFS is is suitable for downsampling and for upsampling:
    - Upsampling uses the PFS as Transposed Direct-Form FIR filter, where the
      commutator selects the polyphases from phase 0 to Nphases - 1 to yield
      output samples for every input sample [HARRIS Fig 7.11, 7.12, CROCHIERE
      Fig 3.25].
    - Downsampling uses the PFS as a Direct-Form FIR filter, where the
Eric Kooistra's avatar
Eric Kooistra committed
      commutator selects the polyphases from phase Nphases - 1 downto 0 to sum
Eric Kooistra's avatar
Eric Kooistra committed
      samples from the past for one output sample [HARRIS Fig 6.12, 6.13,
      CROCHIERE Fig 3.29].
    - In both cases the commutator rotates counter clockwise, because the PFS
      uses a delay line of z^(-1) called type 1 structure [CROCHIERE 3.3.3,
      VAIDYANATHAN 4.3].
Eric Kooistra's avatar
Eric Kooistra committed
    Input:
    . coefs: Real FIR coefficients b[0 : Nphases * Ntaps - 1].
Eric Kooistra's avatar
Eric Kooistra committed
    . Nphases: number of polyphases, is number of rows (axis=0) in the
               polyphase structure.
    . cmplx: Define PFS delays for real (when False) or complex (when True)
             FIR data.
Eric Kooistra's avatar
Eric Kooistra committed
    . Ntaps: number of taps per polyphase, is number of columns (axis=1) in
      the polyphase structure.
    . polyCoefs: FIR coefficients storage with Nphases rows and Ntaps columns.
    . polyDelays: FIR taps storage with Nphases rows and Ntaps columns.
Eric Kooistra's avatar
Eric Kooistra committed

    Storage of FIR coefficients and FIR data:
      Stream of input samples x[n], n >= 0 for increasing time. Show index n
      also as value so x[n] = n:

                 oldest sample                             newest sample
        x         v                                                  v
        samples  [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, ...]
        blocks   [         0,          1,            2,              3]
                                                                     ^
                                                           newest block

      The delayLine index corresponds to the FIR coefficient index k of
      b[k] = impulse response h[k]. For Ncoefs = 12, with Nphases = 4 rows and
      Ntaps = 3 columns:

        polyCoefs = [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9,10,11] = b[k]
                                 row = polyphase
        polyCoefs = [[0, 4, 8],    0
                     [1, 5, 9],    1
                     [2, 6,10],    2
                     [3, 7,11]]    3
             column:  0, 1, 2

      Shift x[n] samples into delayLine from left, show sample index n in (n)
      and delayLine index k in [k]. Shift in per sample or per block.

        shift in -->(15,14,13,12,11,10, 9, 8, 7, 6, 5, 4) --> shift out
        delayLine = [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9,10,11]

      For polyDelays shift in from left-top (n = 15 is newest input sample)
      and shift out from right-bottom (n = 4 is oldest sample).
Eric Kooistra's avatar
Eric Kooistra committed

                       v              v
        polyDelays = [[0, 4, 8],   ((15,11, 7),
                      [1, 5, 9],    (14,10, 6),
                      [2, 6,10],    (13, 9, 5),
                      [3, 7,11]]    (12, 8, 4))
                             v              v

      Output sample y[n] = sum(polyCoefs * polyDelays).

      For downsampling by Ndown = Nphases shift blocks of Ndown samples in
      from left in polyDelays, put newest block[3] = x[12,13,14,15] in first
      column [0] of polyDelays.

      Store input samples in polyDelays structure, use mapping to access as
      delayLine:
        . map_to_delay_line()
        . map_to_poly_delays()
    def __init__(self, Nphases, coefs, cmplx=False):
        self.coefs = coefs
        self.Ncoefs = len(coefs)
        self.Nphases = Nphases  # branches, rows
        self.Ntaps = ceil_div(self.Ncoefs, Nphases)  # taps, columns
        self.cmplx = cmplx
Eric Kooistra's avatar
Eric Kooistra committed
        self.init_poly_coeffs()
        self.reset_poly_delays()

    def init_poly_coeffs(self):
        """Polyphase structure for FIR filter coefficients coefs in Nphases.

        Input:
        . coefs = prototype FIR filter coefficients (= b[k] = impulse response)
        Return:
        . If Ncoefs < Nphases * Ntaps, then pad coefs with zeros.
        . E.g. coefs = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
               coefs.reshape((4, 3)) = [[0, 1, 2],
                                        [3, 4, 5],
                                        [6, 7, 8],
                                        [9,10,11])
        . Therefore for Nphases = 4 rows and Ntaps = 3 columns use:
          polyCoefs = coefs.reshape((3, 4)).T
                    = [[0, 4, 8],    # p = 0, H0(z)
                       [1, 5, 9],    # p = 1, H1(z)
                       [2, 6,10],    # p = 2, H2(z)
                       [3, 7,11])    # p = 3, H3(z),
                                       Nphases = 4 rows (axis=0)
                                       Ntaps = 3 columns (axis=1)
        """
        polyCoefs = np.zeros(self.Nphases * self.Ntaps)  # real
Eric Kooistra's avatar
Eric Kooistra committed
        polyCoefs[0 : self.Ncoefs] = self.coefs
        self.polyCoefs = polyCoefs.reshape((self.Ntaps, self.Nphases)).T

    def reset_poly_delays(self):
        self.polyDelays = zeros((self.Nphases, self.Ntaps), self.cmplx)
    def map_to_delay_line(self):
Eric Kooistra's avatar
Eric Kooistra committed
        delayLine = self.polyDelays.T.reshape(-1)
        return delayLine

    def map_to_poly_delays(self, delayLine):
Eric Kooistra's avatar
Eric Kooistra committed
        self.polyDelays = delayLine.reshape((self.Ntaps, self.Nphases)).T
Eric Kooistra's avatar
Eric Kooistra committed
    def shift_in_data(self, inData, flipped):
Eric Kooistra's avatar
Eric Kooistra committed
        """Shift block of data into the polyDelays structure.
Eric Kooistra's avatar
Eric Kooistra committed
        View polyDelays as delay line if L < Nphases. Shift in from the left
        in the delay line, so last (= newest) sample in inData will appear at
        most left of the delay line. If L = Nphases, then it is possible to
        shift the data directly into the first (= left) column of polyDelays.

        Input:
        . inData: Block of one or more input samples with index as time index
            n in inData[n], so oldest sample at index 0 and newest sample at
            index -1.
        . flipped: False then inData order is inData[n] and still needs to be
            flipped. If True then the inData is already flipped.
        """
        L = len(inData)
        xData = inData if flipped else np.flip(inData)
        if L < self.Nphases:
            delayLine = self.map_to_delay_line()
            # Equivalent code:
Eric Kooistra's avatar
Eric Kooistra committed
            # delayLine = np.concatenate((inData, delayLine[L:]))
            delayLine = np.roll(delayLine, L)
            delayLine[:L] = xData
            self.map_to_poly_delays(delayLine)
        else:
            # Equivalent code for L == Nphases: Shift in inData block directly
            # at column 0
            self.polyDelays = np.roll(self.polyDelays, 1, axis=1)
            self.polyDelays[:, 0] = xData
Eric Kooistra's avatar
Eric Kooistra committed
    def filter_block(self, inData, flipped):
        """Filter block of inData per polyphase.
Eric Kooistra's avatar
Eric Kooistra committed
        . inData: block of input data with length <= Nphases, time index n in
            inData[n] increments, so inData[0] is oldest data and inData[-1] is
            newest data. The inData block size is = 1 for upsampling or > 1
            and <= Nphases for downsampling.
        . flipped: False then inData order is inData[n] with newest sample last
            and still needs to be flipped. If True then the inData is already
            flipped in time, so with newest sample first.
        . pfsData: block of polyphase FIR filtered output data for Nphases, with
Eric Kooistra's avatar
Eric Kooistra committed
            pfsData[p] and p = 0:Nphases-1 from top to bottom.
        # Shift in one block of input data (1 <= len(inData) <= Nphases)
Eric Kooistra's avatar
Eric Kooistra committed
        self.shift_in_data(inData, flipped)
Eric Kooistra's avatar
Eric Kooistra committed
        # Apply FIR coefs per delay element
        zData = self.polyDelays * self.polyCoefs
Eric Kooistra's avatar
Eric Kooistra committed
        # Sum FIR taps per polyphase
        pfsData = np.sum(zData, axis=1)
        # Output block of Nphases filtered output data
Eric Kooistra's avatar
Eric Kooistra committed
        return pfsData
Eric Kooistra's avatar
Eric Kooistra committed
    def filter_up(self, inSample):
        """Filter same input sample at each polyphase, to upsample it by factor
        U = Nphases.

        Input:
        . inSample: input sample that is applied to each polyphase
        Return:
        . pfsData: block of polyphase FIR filtered output data for Nphases,
            with pfsData[p] and p = 0:Nphases-1 from top to bottom. The
            pfsData[0] is the first and thus oldest and pfsData[-1] is the last
            and thus newest interpolated data.
        """
        inWires = inSample * ones(self.Nphases, np.iscomplexobj(inSample))
        pfsData = self.filter_block(inWires, flipped=True)
        return pfsData

Eric Kooistra's avatar
Eric Kooistra committed
###############################################################################
# Up, down, resample low pass input signal
###############################################################################

def polyphase_data_for_downsampling_whole_x(x, Ndown, Nzeros):
Eric Kooistra's avatar
Eric Kooistra committed
    """Polyphase data structure for downsampling whole signal x.
Eric Kooistra's avatar
Eric Kooistra committed
    The polyphase structure has Ndown branches and branch size suitable to use
    lfilter() once per polyphase branch for whole signal x. Instead of using
    pfs.polyDelays per pfs.Ntaps.
Eric Kooistra's avatar
Eric Kooistra committed
    . Prepend x with Nzeros = Ndown - 1 zeros to have first down sampled sample
      at m = 0 start at n = 0.
    . Skip any remaining last samples from x, that are not enough to yield a
      new output FIR sum.
Eric Kooistra's avatar
Eric Kooistra committed
    . x: all input samples x[n] for n = 0 : Lx - 1
    . Ndown: downsample rate and number of polyphase branches
Eric Kooistra's avatar
Eric Kooistra committed
    . Nzeros: number of zero samples to prepend for x
Eric Kooistra's avatar
Eric Kooistra committed
    . polyX: polyphase data structure with size (Ndown, Nxp) for Ndown branches
Eric Kooistra's avatar
Eric Kooistra committed
        and Nxp samples from x per branch.
    . Nx: Total number of samples from x, including prepended Ndown - 1 zeros.
Eric Kooistra's avatar
Eric Kooistra committed
    . Nxp: Total number of samples used from x per polyphase branch, is the
        number of samples Ny, that will be in downsampled output y[m] = x[m D],
        for m = 0, 1, 2, ..., Nxp - 1.
Eric Kooistra's avatar
Eric Kooistra committed
    Lx = len(x)
Eric Kooistra's avatar
Eric Kooistra committed
    Nxp = (Nzeros - 1 + Lx) // Ndown  # Number of samples per polyphase
    Nx = 1 + Ndown * (Nxp - 1)  # Used number of samples from x
Eric Kooistra's avatar
Eric Kooistra committed
    # Load x into polyX with Ndown rows = polyphases
    # . prepend x with Ndown - 1 zeros
Eric Kooistra's avatar
Eric Kooistra committed
    # . skip any last remaining samples from x, that are not enough yield a new
    #   output FIR sum.
    polyX = zeros(Ndown * Nxp, np.iscomplexobj(x))
    polyX[Ndown - 1] = x[0]
    polyX[Ndown:] = x[1 : Nx]
Eric Kooistra's avatar
Eric Kooistra committed
    # . Store data in time order per branch, so with oldest data left, to match
    #   use with lfilter(). Note this differs from order of tap data in
    #   pfs.polyDelays where newest data is left, because the block data is
    #   flipped by shift_in_data(), to match use in pfs.filter_block(), so that
Eric Kooistra's avatar
Eric Kooistra committed
    #   it can use pfs.polyDelays * pfs.polyCoefs to filter the block.
    # . The newest sample x[-1] has to be in top branch of polyX and the oldest
    #   sample x[0] in bottom branch, to match branch order of 0:Ndown-1 from
    #   top to bottom in the pfs.polyCoefs, therefore do flipud(polyX). This
    #   flipud() is similar as the flip() per block in shift_in_data().
    #
    #         x[0] is oldest                              x[-1] is newest
    #           |                                            |
    #   x[n] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9,10,11,12,13,14,15]
    #          (0, 1, 2, 3, 4, 5, 6, 7, 8, 9,10,11,12,13,14,15), x[n] = n
    #                                                              newest
    #   poly                 poly        v                             |
    #   Coefs = [[0, 4, 8],  Delays = ((15,11, 7),  polyX = ((3, 7,11,15),
    #            [1, 5, 9],            (14,10, 6),           (2, 6,10,14),
    #            [2, 6,10],            (13, 9, 5),           (1, 5, 9,13),
    #            [3, 7,11]]            (12, 8, 4))           (0, 4, 8,12))
    #                                          v              |
    #                                                       oldest
Eric Kooistra's avatar
Eric Kooistra committed
    polyX = np.flipud(polyX.reshape(Nxp, Ndown).T)
    return polyX, Nx, Nxp


Eric Kooistra's avatar
Eric Kooistra committed
def polyphase_frontend(x, Nphases, coefs, sampling):
    """Calculate PFS FIR filter output of x for Nphases.

    The polyY[Nphases] output of this PFS frontend can be:
    . summed for dowsampling by factor D = Nphases
    . used as is for upsampling by factor U = Nphases
    . used with a range of Nphases LOs for a single channel downsampler
      downconverter, or upsampler upconverter
    . used with a IDFT for a analysis filterbank (PFB), or synthesis PFB.

    Input:
    . x: Input signal x[n]
    . Nphases: number of polyphase branches in PFS
    . coefs: prototype FIR filter coefficients for anti aliasing LPF. The
        len(coefs) typically is multiple of Nphases. If shorter, then the
        coefs are extended with zeros.
    . sampling: 'up' or 'down'
    Return:
    . polyY[Nphases]: Output of the FIR filtered branches in the PFS.
    . Nx = np.size(polyY), total number of samples from x
    . Nxp = np.size(polyY, axis=1), total number of samples from x per
        polyphase.
    """
    # Use polyphase FIR filter coefficients from pfs class.
    pfs = PolyPhaseFirFilterStructure(Nphases, coefs, np.iscomplexobj(x))
Eric Kooistra's avatar
Eric Kooistra committed

    # Define polyphases for whole data signal x with Nx samples, instead of
    # using polyDelays per Ntaps from pfs class, to be able to use
    # signal.lfilter() per polyphase branch for the whole data signal x.
    if sampling == 'up':
        Nx = len(x)
        Nxp = Nx
        Nup = Nphases
        # Filter whole x per polyphase, so Nxp = Nx, because the Nup branches
        # each use all x to yield Nup output samples per input sample from x.
        # The commutator index order for upsampling is p = 0, 1, .., Nup - 1,
        # so from top to bottom in the PFS.
        polyY = zeros((Nup, Nx), np.iscomplexobj(x))

Eric Kooistra's avatar
Eric Kooistra committed
        pCommutator = range(Nup)
        for p in pCommutator:
            polyY[p] = Nup * signal.lfilter(pfs.polyCoefs[p], c_firA, x)
Eric Kooistra's avatar
Eric Kooistra committed
    else:
        # 'down':
        # . Prepend x with Ndown - 1 zeros, to have y[m] for m = 0 start at
        #   n = 0 of x[n].
        # . Size of polyX is (Ndown, Nxp), and length of y is Ny = Nxp is
        #   length of each branch.
        Ndown = Nphases
        Nzeros = Ndown - 1
        polyX, Nx, Nxp = polyphase_data_for_downsampling_whole_x(x, Ndown, Nzeros)
Eric Kooistra's avatar
Eric Kooistra committed
        # Filter Ndown parts of x per polyphase, because the FIR filter output
        # y will sum. The commutator index order for downsampling is p =
        # Ndown - 1,..., 1, 0, so from bottom to top in the PFS. However, the
        # commutator index order is only relevant when the branches are
        # calculated sequentially to reuse the same hardware, because for the
        # output y the branches are summed anyway.
        polyY = zeros((Ndown, Nxp), np.iscomplexobj(x))
Eric Kooistra's avatar
Eric Kooistra committed
        pCommutator = np.flip(np.arange(Ndown))
        for p in pCommutator:
            polyY[p] = signal.lfilter(pfs.polyCoefs[p], c_firA, polyX[p])
    return polyY, Nx, Nxp


Eric Kooistra's avatar
Eric Kooistra committed
def upsample(x, Nup, coefs, verify=False, verbosity=1):  # interpolate
Eric Kooistra's avatar
Eric Kooistra committed
    """Upsample x by factor U = Nup and LPF.

    Input:
    . x: Input signal x[n]
    . Nup: upsample (interpolation) factor
    . coefs: prototype FIR filter coefficients for anti aliasing LPF
Eric Kooistra's avatar
Eric Kooistra committed
    . verify: when True then verify that output y is the same when calculated
              directly or when calculated using the polyphase implementation.
Eric Kooistra's avatar
Eric Kooistra committed
    - verbosity: when > 0 print() status, else no print()
    Return:
    . y: Upsampled output signal y[m]

    Assumptions:
    . x[n] = 0 for n < 0
Eric Kooistra's avatar
Eric Kooistra committed
    . m = n * U, so first upsampled output sample starts at m = 0 based on
      n = 0
Eric Kooistra's avatar
Eric Kooistra committed
    . insert U - 1 zeros after each x[n] to get v[m], so len(v) = len(y) =
      U * len(x)
    . use coefs as anti aliasing filter, must be LPF with BW < fNyquist / U
Eric Kooistra's avatar
Eric Kooistra committed
    . len(coefs) typically is multiple of Nup. If shorter, then the coefs are
      extended with zeros.
Eric Kooistra's avatar
Eric Kooistra committed
    . The input sample period is ts and the output sample period of the
      upsampled (= interpolated signal) is tsUp = ts / U
    . The group delay is (Ncoefs - 1) / 2 * tsUp = (Ncoefs - 1) / U / 2 * ts.
      With odd Ncoefs and symmetrical coefs to have linear phase, the group
      delay is an integer number of tsUp periods, so:
      - when (Ncoefs - 1) %  2 == 0, then the group delay is an integer number
        of tsUp periods
      - when (Ncoefs - 1) % (2 * U) == 0, then the group delay is an integer
        number of ts periods.
Eric Kooistra's avatar
Eric Kooistra committed
    # Polyphase FIR filter input x
    polyY, Nx, _ = polyphase_frontend(x, Nup, coefs, 'up')

    # Output Nup samples y[m] for every input sample x[n]
    y = polyY.T.reshape(1, Nup * Nx)[0]

    if verify:
Eric Kooistra's avatar
Eric Kooistra committed
        # Inefficient upsampling implementation with multiply by zero values.
        # Verify that x --> U --> LPF --> y yields identical result y as with
        # using the PFS: x --> PFS FIR --> y.
        xZeros = zeros((Nup, Nx), np.iscomplexobj(x))
        xZeros[0] = x
        xZeros = xZeros.T.reshape(1, Nup * Nx)[0]  # upsample
        yVerify = Nup * signal.lfilter(coefs, c_firA, xZeros)  # LPF
Eric Kooistra's avatar
Eric Kooistra committed
        print('> Verify upsample():')
        if np.allclose(y, yVerify, rtol=c_rtol, atol=c_atol):
Eric Kooistra's avatar
Eric Kooistra committed
            print('  . PASSED: correct upsample result')
            exit('ERROR: wrong upsample result')
Eric Kooistra's avatar
Eric Kooistra committed
    if verbosity:
Eric Kooistra's avatar
Eric Kooistra committed
        print('> Log upsample():')
Eric Kooistra's avatar
Eric Kooistra committed
        print('  . Nup    =', str(Nup))
        print('  . Nx     =', str(Nx))
        print('  . len(y) =', str(len(y)))
Eric Kooistra's avatar
Eric Kooistra committed
        print('')
Eric Kooistra's avatar
Eric Kooistra committed
def downsample(x, Ndown, coefs, verify=False, verbosity=1):  # decimate
Eric Kooistra's avatar
Eric Kooistra committed
    """LPF and downsample x by factor D = Ndown.

    Input:
    . x: Input signal x[n]
    . Ndown: downsample (decimation) factor
    . coefs: prototype FIR filter coefficients for anti aliasing LPF
Eric Kooistra's avatar
Eric Kooistra committed
    . verify: when True then verify that output y is the same when calculated
              directly or when calculated using the polyphase implementation.
Eric Kooistra's avatar
Eric Kooistra committed
    - verbosity: when > 0 print() status, else no print()
    Return:
    . y: Downsampled output signal y[m]

    Assumptions:
    . x[n] = 0 for n < 0
Eric Kooistra's avatar
Eric Kooistra committed
    . m = n // D, so first downsampled output sample starts at m = 0 based on
      n = 0
Eric Kooistra's avatar
Eric Kooistra committed
    . skip D - 1 values zeros after each x[n] to get y[m], so len(y) =
      len(x) // D
    . use coefs as anti aliasing filter, must be LPF with BW < fNyquist / D
Eric Kooistra's avatar
Eric Kooistra committed
    . len(coefs) typically is multiple of Ndown. If shorter, then the coefs
      are extended with zeros.
Eric Kooistra's avatar
Eric Kooistra committed
    . The input sample period is ts and the output sample period of the
      downsampled (= decimated signal) is tsDown = ts * D
    . The group delay is (Ncoefs - 1) / 2 * ts. With odd Ncoefs and symmetrical
      coefs to have linear phase, the group delay is an integer number of ts
      periods, so:
      - when (Ncoefs - 1) %  2      == 0, then the group delay is an integer
        number of ts periods
      - when (Ncoefs - 1) % (2 * D) == 0, then the group delay is an integer
        number of tsDown periods
Eric Kooistra's avatar
Eric Kooistra committed
    # Polyphase FIR filter input x
    polyY, Nx, Nxp = polyphase_frontend(x, Ndown, coefs, 'down')

    # FIR filter sum
Eric Kooistra's avatar
Eric Kooistra committed
        # Inefficient downsampling implementation with calculating values that
Eric Kooistra's avatar
Eric Kooistra committed
        # are removed. Verify that x --> LPF --> D --> y yields identical
        # result y as with using the PFS: x --> PFS FIR --> y.
        yVerify = zeros(Ndown * Nxp, np.iscomplexobj(x))
Eric Kooistra's avatar
Eric Kooistra committed
        yVerify[0 : Nx] = signal.lfilter(coefs, c_firA, x[0 : Nx])  # LPF
Eric Kooistra's avatar
Eric Kooistra committed
        yVerify = yVerify.reshape(Nxp, Ndown).T   # polyphases
        yVerify = yVerify[0]   # downsample by D
        print('> Verify downsample():')
        if np.allclose(y, yVerify, rtol=c_rtol, atol=c_atol):
Eric Kooistra's avatar
Eric Kooistra committed
            print('  . PASSED: correct downsample result')
            exit('ERROR: wrong downsample result')
Eric Kooistra's avatar
Eric Kooistra committed
    if verbosity:
Eric Kooistra's avatar
Eric Kooistra committed
        print('> Log downsample():')
Eric Kooistra's avatar
Eric Kooistra committed
        print('  . len(x) =', str(len(x)))
        print('  . Ndown  =', str(Ndown))
        print('  . Nx     =', str(Nx))
        print('  . Nxp    =', str(Nxp))
        print('  . len(y) =', str(len(y)))  # = Nxp
Eric Kooistra's avatar
Eric Kooistra committed
def resample(x, Nup, Ndown, coefs, verify=False, verbosity=1):  # interpolate and decimate by Nup / Ndown
    """Resample x by factor U / D = Nup / Ndown

    x[n] --> Nup --> v[m] --> LPF --> w[m] --> Ndown --> y[k]

Eric Kooistra's avatar
Eric Kooistra committed
    The w[m] is the result of upsampling x[n] by Nup. The LPF has narrowed the
    pass band to suit both Nup and Ndown. Therefore the downsampling by Ndown
    implies that only one in every Ndown samples of w[m] needs to be calculated
    to get y[k] = w[k * D]. Hence resampling by Nup / Ndown takes less
    processing than upsampling when Ndown > 1.
    Poly phases in up and downsampling:
Eric Kooistra's avatar
Eric Kooistra committed
    The upsampling structure outputs the interpolated samples via the separate
    output phases. The same input sequence x is used to calculate all Nup
    phases, whereby each phase uses a different set of coefficients from the
    LPF. The downsampling structure sums the Ndown phases to output the
    downsampled samples. Each phase uses a different set of coefficients from
    the LPF to filter Ndown delay phases of the input sequence x.
Eric Kooistra's avatar
Eric Kooistra committed
    Resampling is upsampling with downsampling by phase selection:
Eric Kooistra's avatar
Eric Kooistra committed
    The resampling is done by first upsampling and then downsampling, because
Eric Kooistra's avatar
Eric Kooistra committed
    then only one shared LPF is needed. For upsampling an LPF is always
Eric Kooistra's avatar
Eric Kooistra committed
    needed, because it has to construct the inserted Nup - 1 zero values. For
    downsampling the LPF of the upsampling already has restricted the input
    band width (BW), provided that the LPF has BW < fNyquist / U and BW <
    fNyquist / D. The downsampling therefore does not need an LPF and reduces
    to selecting appropriate output phase of the upsampler. The upsampler then
    only needs to calculate the output phase that will be used by the
    downsampler, instead of calculating all output phases.

    Input:
    . x: Input signal x[n]
    . Nup: upsample (interpolation) factor
    . Ndown: downsample (decimation) factor
    . coefs: FIR filter coefficients for anti aliasing LPF
Eric Kooistra's avatar
Eric Kooistra committed
    . verify: when True then verify that output y is the same when calculated
              directly or when calculated using the polyphase implementation.
Eric Kooistra's avatar
Eric Kooistra committed
    - verbosity: when > 0 print() status, else no print()
    Return:
    . y: Resampled output signal y[m]

    Assumptions:
    . x[n] = 0 for n < 0
Eric Kooistra's avatar
Eric Kooistra committed
    . k = m // D = (n * U) // D, so first resampled output sample starts at
      k = 0 based on n = 0
Eric Kooistra's avatar
Eric Kooistra committed
    . insert U - 1 zeros after each x[n] to get v[m], so len(v) = len(y) =
      U * len(x)
    . use coefs as anti aliasing filter, must be LPF with BW < fNyquist / U
      and BW < fNyquist / D
    . len(coefs) typically is multiple of Nup. If shorter, then the coefs are
      extended with zeros.
Eric Kooistra's avatar
Eric Kooistra committed
    . The group delay is the same as for the upsampler, because the
      downsampling has no additional filtering.
Eric Kooistra's avatar
Eric Kooistra committed
    # Nyp is number of samples from v, per polyphase in polyY
    Nyp = (Nv + Ndown - 1) // Ndown
    # Ny is used number of samples from v
    Ny = 1 + Ndown * (Nyp - 1)

    # Upsampling with LPF
    w = upsample(x, Nup, coefs, verify=False)
    # Downsampling selector
Eric Kooistra's avatar
Eric Kooistra committed
    # This model is not efficient, because the upsample() has calculated all
    # Nup phases, whereas it only needs to calculate the phase that is selected
    # by the downsampling. However, it is considered not worth the effort to
    # make the model more efficient, because then only parts of lfilter() in
    # upsample() need to be calulated, so then lfilter() needs to be
    # implemented manually and possibly in a for loop [CROCHIERE 3.3.4].
    downSelect = np.arange(0, Nyp) * Ndown
    y = w[downSelect]

    if verify:
        # Inefficient implementation
        # . Upsampling with multiply by zero values
Eric Kooistra's avatar
Eric Kooistra committed
        v = np.zeros((Nup, Nx))   # polyphases
        v[0] = x
        v = v.T.reshape(1, Nup * Nx)[0]  # upsample
        # . LPF
        w = Nup * signal.lfilter(coefs, c_firA, v)
        # . Downsampling with calculating values that are removed
        yVerify = np.zeros(Ndown * Nyp)
        yVerify[0 : Ny] = w[0 : Ny]
Eric Kooistra's avatar
Eric Kooistra committed
        yVerify = yVerify.reshape(Nyp, Ndown).T   # polyphases
Eric Kooistra's avatar
Eric Kooistra committed
        print('> Verify resample():')
        if np.allclose(y, yVerify, rtol=c_rtol, atol=c_atol):
Eric Kooistra's avatar
Eric Kooistra committed
            print('  . PASSED: correct resample result')
            exit('ERROR: wrong resample result')
Eric Kooistra's avatar
Eric Kooistra committed
    if verbosity:
Eric Kooistra's avatar
Eric Kooistra committed
        print('> Log resample():')
Eric Kooistra's avatar
Eric Kooistra committed
        print('  . len(x) =', str(len(x)))
        print('  . Nx     =', str(Nx))
        print('  . len(v) =', str(len(v)))
        print('  . Ny     =', str(Ny))
        print('  . Nyp    =', str(Nyp))
        print('  . len(y) =', str(len(y)))
Eric Kooistra's avatar
Eric Kooistra committed
        print('')
Eric Kooistra's avatar
Eric Kooistra committed


###############################################################################
# Single bandpass channel up and downsampling and up and downconversion
Eric Kooistra's avatar
Eric Kooistra committed
###############################################################################

Eric Kooistra's avatar
Eric Kooistra committed
def unit_circle_loops_phasor_arr(k, N, sign):
    """Return array of N phasors on k loops along the unit circle.

    Polyphase dependent phase offsets for bin k, when N = Ndown = Ndft [HARRIS
    Eq 6.8]. For k = 1 this yields the roots of unity.

    Input:
    . k: bin in range(N)
    . N: number of phasors
    . sign: +1 or -1 term in exp()
    . pArr: exp(sign 2j pi k / N) for k in 0 : N - 1
Eric Kooistra's avatar
Eric Kooistra committed
    """
    pArr = np.array([np.exp(sign * 2j * np.pi * p * k / N) for p in range(N)])
def time_shift_phasor_arr(k, M, Ndft, Msamples, sign):
    """Return array of Msamples phasors in time to compensate for oversampling
    time shift.

    The time shift due to down or upsampling causes a frequency component of
    k * M / Ndft. With oversampling M < Ndft, and then after down or upsampling
    there remains a frequency offset that can be compensated by given by mArr
    [HARRIS Eq 9.3].
    Input:
    . k: Index of BPF center frequency w_k = 2 pi k / Ndft
    . M: downsample or upsample factor
    . Ndft: DFT size, number of bins and number of polyphases in PFS FIR filter
    . Msamples: Requested number of output samples in time
    . sign: +1 or -1 term in exp()
    . mArr = exp(sign 2j pi k * M / Ndft * m),  for m in 0 : Msamples - 1
    mArr = np.exp(sign * 2j * np.pi * k * M / Ndft * np.arange(Msamples))
Eric Kooistra's avatar
Eric Kooistra committed


def maximal_downsample_bpf(x, Ndown, k, coefs, verbosity=1):
    """BPF x at bin k in range(Ndown) and downsample x by factor D = Ndown and
    Ndown = Ndft.
    Implement maximal downsampling downconverter for one bin (= critically
Eric Kooistra's avatar
Eric Kooistra committed
    sampled) [HARRIS Fig 6.14].

    The BPF is centered at w_k = 2pi k / Ndft, where Ndft is number of
    frequency bins, is DFT size. The downsampling is maximal so Ndown = Ndft.
    The polyphase structure has Nphases = Ndown branches, so the input x
    data that shifts in remains in each branch. Therefore each branch can be
    FIR filtered independently for the whole input x using
    polyphase_frontend().
Eric Kooistra's avatar
Eric Kooistra committed

    Input:
    . x: Input signal x[n]
    . Ndown: downsample factor
    . k: Index of BPF center frequency w_k = 2 pi k / Ndown
    . coefs: prototype FIR filter coefficients for anti aliasing BPF
    - verbosity: when > 0 print() status, else no print()
    Return:
    . yc: Downsampled and downconverted output signal yc[m], m = n // D for
Eric Kooistra's avatar
Eric Kooistra committed
          bin k. Complex baseband signal.
    """
    # Polyphase FIR filter input x
    polyY, Nx, Nxp = polyphase_frontend(x, Ndown, coefs, 'down')

    # Phase rotate per polyphase for bin k, due to delay line at branch inputs
    # [HARRIS Eq 6.8]
Eric Kooistra's avatar
Eric Kooistra committed
    kPhasors = unit_circle_loops_phasor_arr(k, Ndown, 1)
Eric Kooistra's avatar
Eric Kooistra committed
    for p in range(Ndown):
        polyYc[p] = polyY[p] * kPhasors[p]  # row = row * scalar
Eric Kooistra's avatar
Eric Kooistra committed

    # Sum the branch outputs to get single downsampled and downconverted output
    # complex baseband value yc.
Eric Kooistra's avatar
Eric Kooistra committed

    if verbosity:
        print('> Log maximal_downsample_bpf():')
        print('  . len(x)  =', str(len(x)))
        print('  . Nx      =', str(Nx))
        print('  . Nxp     =', str(Nxp))
        print('  . len(yc) =', str(len(yc)))  # = Nxp
        print('  . Ndown   =', str(Ndown))
        print('  . k       =', str(k))
        print('')
    return yc


def maximal_upsample_bpf(xBase, Nup, k, coefs, verbosity=1):
    """BPF xBase at bin k in range(Ndft), and upsample xBase by factor U = Nup =
    Ndft.

    Implement maximal upsampling upconverter for one bin (= critically
    sampled) [HARRIS Fig. 7.16].

    The BPF is centered at w_k = 2pi k / Ndft, where Ndft is number of
    frequency bins, is DFT size. The upsampling is maximal so Nup = Ndft. The
    polyphase structure has Nphases = Nup branches, so the output yc data
    shifts out from the same branch for each block of Nup samples. Therefore
    each branch can be FIR filtered independently for the whole input xBase
    using polyphase_frontend().
    . Nup: upsample factor
    . k: Index of BPF center frequency w_k = 2 pi k / Nup
    . coefs: prototype FIR filter coefficients for anti aliasing and
        interpolating BPF
    - verbosity: when > 0 print() status, else no print()
    Return:
    . yc: Upsampled and upconverted output signal yc[n], n = m U at
          intermediate frequency (IF) of bin k. Complex positive frequencies
          only signal.
    # Polyphase FIR filter input xBase
    polyY, Nx, Nxp = polyphase_frontend(xBase, Nup, coefs, 'up')

    # Phase rotate per polyphase for bin k, due to delay line at branch inputs
Eric Kooistra's avatar
Eric Kooistra committed
    # [HARRIS Eq 7.8 = 6.8, Fig 7.16], can be applied after the FIR filter,
    # because the kPhasors per polyphase are constants.
    kPhasors = unit_circle_loops_phasor_arr(k, Nup, 1)
    for p in range(Nup):
        polyYc[p] = polyY[p] * kPhasors[p]  # row = row * scalar
    # Output Nup samples yc[m] for every input sample xBase[n]
    yc = polyYc.T.reshape(1, Nup * Nx)[0]
Eric Kooistra's avatar
Eric Kooistra committed
        print('> Log maximal_upsample_bpf():')
        print('  . len(xBase) =', str(len(xBase)))
        print('  . Nx         =', str(Nx))
        print('  . Nxp        =', str(Nxp))
        print('  . len(yc)    =', str(len(yc)))  # = Nxp
        print('  . Nup        =', str(Nup))
        print('  . k          =', str(k))
Eric Kooistra's avatar
Eric Kooistra committed
def non_maximal_downsample_bpf(x, Ndown, k, Ndft, coefs, verbosity=1):
    """BPF x at bin k in range(Ndown), and downsample x by factor D = Ndown.
    Implement nonmaximal downsampling downconverter for one bin. The maximal
    downsampling downconverter for one bin has the kPhasors per polyphase
    branch of [HARRIS Eq. 6.8 and Fig 6.14]. For nonmaximal this needs to be
    extended with the tPhasors per polyphase branch of [HARRIS Eq. 9.3, to
    compensate for the oversampling time shift.
Eric Kooistra's avatar
Eric Kooistra committed

    The BPF is centered at w_k = 2pi k / Ndft, where Ndft is number of
    frequency bins, is DFT size. The polyphase FIR structure has Nphases = Ndft
    branches, to fit the requested number of bins. The polyphase FIR structure
    is maximally downsampled (= critically sampled) for Ndown = Ndft, but it
    can support any Ndown <= Ndft. The input data shifts in per Ndown samples,
    so it appears in different branches when Ndown < Ndft and a new block is
    shifted in. Therefore the input data cannot be FIR filtered per branch for
    the whole input x. Instead it needs to be FIR filtered per block of Ndown
    input samples from x, using pfs.polyDelays in pfs.filter_block().

    Input:
    . x: Input signal x[n]
    . Ndown: downsample factor
    . k: Index of BPF center frequency w_k = 2 pi k / Ndft
    . Ndft: DFT size, number of polyphases in PFS FIR filter
    . coefs: prototype LPF FIR filter coefficients for anti aliasing BPF
    - verbosity: when > 0 print() status, else no print()
    Return:
    . yc: Downsampled and downconverted output signal yc[m], m = n // D for
Eric Kooistra's avatar
Eric Kooistra committed
          bin k. Complex baseband signal.
    """
    # Prepend x with Ndown - 1 zeros, and represent x in Nblocks of Ndown
    # samples
    Nzeros = Ndown - 1
    xBlocks, Nx, Nblocks = polyphase_data_for_downsampling_whole_x(x, Ndown, Nzeros)
Eric Kooistra's avatar
Eric Kooistra committed
    # print(xBlocks[:, 0])
Eric Kooistra's avatar
Eric Kooistra committed

    # Prepare output
    yc = np.zeros(Nblocks, dtype='cfloat')

    # PFS with Ndft polyphases
    pfs = PolyPhaseFirFilterStructure(Ndft, coefs)
Eric Kooistra's avatar
Eric Kooistra committed
    kPhasors = unit_circle_loops_phasor_arr(k, Ndft, 1)  # [HARRIS Eq 6.8]

    # Oversampling time shift compensation via frequency dependent phase shift
    tPhasors = time_shift_phasor_arr(k, Ndown, Ndft, Nblocks, -1)  # [HARRIS Eq 9.3]
Eric Kooistra's avatar
Eric Kooistra committed

    for b in range(Nblocks):
        # Filter block
        inData = xBlocks[:, b]
Eric Kooistra's avatar
Eric Kooistra committed
        pfsData = pfs.filter_block(inData, flipped=True)
        # Phase rotate polyphases for bin k
Eric Kooistra's avatar
Eric Kooistra committed
        pfsBinData = pfsData * kPhasors  # [HARRIS Eq 6.8, 9.3]
        # Sum the polyphases to get single downsampled and downconverted output
        # value
Eric Kooistra's avatar
Eric Kooistra committed
        yc[b] = np.sum(pfsBinData) * tPhasors[b]
Eric Kooistra's avatar
Eric Kooistra committed

    if verbosity:
        print('> non_maximal_downsample_bpf():')
        print('  . len(x)   =', str(len(x)))
        print('  . Nx       =', str(Nx))
        print('  . Nblocks  =', str(Nblocks))
        print('  . len(yc)  =', str(len(yc)))  # = Nblocks
        print('  . Ndown    =', str(Ndown))
        print('  . Ndft     =', str(Ndft))
        print('  . k        =', str(k))
        print('')
    return yc


def non_maximal_upsample_bpf(xBase, Nup, k, Ndft, coefs, verbosity=1):
    """BPF xBase at bin k in range(Nup), and upsample xBase by factor U = Nup.

    Implement nonmaximal upsampling upconverter for one bin. The maximal
    upsampling upconverter for one bin has the kPhasors per polyphase
    branch similar of [HARRIS Fig 7.16]. For nonmaximal this needs to be
    extended with the tPhasors per polyphase branch similar as for down in
    [HARRIS Eq. 9.3], to compensate for the oversampling time shift.

    The BPF is centered at w_k = 2pi k / Ndft, where Ndft is number of
    frequency bins, is DFT size. The polyphase FIR structure has Nphases = Ndft
    branches, to fit the requested number of bins. The polyphase FIR structure
    is maximally upsampled (= critically sampled) for Nup = Ndft, but it
    can support any Nup <= Ndft. The output data shifts out per Nup samples,
    so it appears from different branches when Nup < Ndft and a new block is
    shifted out. Therefore the output data cannot be FIR filtered per branch
    for the whole input xBase. Instead it needs to be FIR filtered per block of
Eric Kooistra's avatar
Eric Kooistra committed
    Nup output samples, using pfs.polyDelays in pfs.filter_up().
Eric Kooistra's avatar
Eric Kooistra committed
    TODO
    . This code only runs ok for Ros = 1, 2 when Ndft = 16, but not for
      Ros = 4, so need to fix that. According to [CROCHIERE 7.2.7] the
      polyphase structure is only suitable for Ros is integer >= 1. For other
      Ros > 1 the weighted overlap-add (WOLA) structure is  suitable, so
      need to add WOLA.

    Input:
    . xBase: Input equivalent baseband signal xBase[m]
    . Nup: upsample factor
    . k: Index of BPF center frequency w_k = 2 pi k / Ndft
    . Ndft: DFT size, number of polyphases in PFS FIR filter
    . coefs: prototype LPF FIR filter coefficients for anti aliasing and
             interpolationBPF
    - verbosity: when > 0 print() status, else no print()
    Return:
    . yc: Upsampled and upconverted output signal yc[n], n = m U at
          intermediate frequency (IF) of bin k. Complex positive frequencies
          only signal.
    """
    Nblocks = len(xBase)

    # Prepare output
    polyYc = np.zeros((Nup, Nblocks), dtype='cfloat')

    # PFS with Ndft polyphases
Eric Kooistra's avatar
Eric Kooistra committed
    pfsUp = PolyPhaseFirFilterStructure(Nup, coefs, cmplx=True)
    kPhasors = unit_circle_loops_phasor_arr(k, Ndft, 1)  # [HARRIS Eq 7.8]

    # Oversampling time shift compensation via frequency dependent phase shift
    tPhasors = time_shift_phasor_arr(k, Nup, Ndft, Nblocks, -1)  # [HARRIS Eq 9.3]

    # Polyphase FIR filter input xBase
    for b in range(Nblocks):
        # Filter block
Eric Kooistra's avatar
Eric Kooistra committed
        inSample = xBase[b]
        pfsData = Nup * pfsUp.filter_up(inSample)
Eric Kooistra's avatar
Eric Kooistra committed
        pfsBinData = pfsData * kPhasors[0:Nup]
        # Output Nup samples yc[n] for every input sample xBase[m]
Eric Kooistra's avatar
Eric Kooistra committed
        outBinData = pfsBinData * tPhasors[b]
        polyYc[:, b] = outBinData

    yc = polyYc.T.reshape(1, Nup * Nblocks)[0]

    if verbosity:
        print('> non_maximal_upsample_bpf():')
        print('  . len(xBase) =', str(len(xBase)))
        print('  . Nblocks    =', str(Nblocks))
        print('  . len(yc)    =', str(len(yc)))  # = Nblocks
        print('  . Nup        =', str(Nup))
        print('  . Ndft       =', str(Ndft))
        print('  . k          =', str(k))
        print('')
    return yc