Skip to content
Snippets Groups Projects
multirate.py 25.8 KiB
Newer Older
#! /usr/bin/env python3
###############################################################################
#
# Copyright 2024
# ASTRON (Netherlands Institute for Radio Astronomy) <http://www.astron.nl/>
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

###############################################################################

# Author: Eric Kooistra
# Purpose: Multirate functions for DSP
# Description:
#
# Usage:
# . Use [2] to verify this code.
#
# References:
# [1] dsp_study_erko.txt
# [2] http://localhost:8888/notebooks/pfb_os/up_downsampling.ipynb
#
# Books:
# . HARRIS 6 title Figure
# . LYONS
# . CROCHIERE

import numpy as np
from scipy import signal
from .utilities import c_rtol, c_atol, ceil_div


def down(x, D, phase=0):
    """Downsample x[n] by factor D, xD[m] = x[m D], m = n // D

    Keep every D-th sample in x and skip every D-1 samples after that sample.
    """
    xD = x[phase::D]
    return xD


def up(x, U):
    """Upsample x by factor U, xU[m] = x[n] when m = n U, else 0.

    After every sample in x insert U-1 zeros.
    """
    xU = np.zeros(len(x) * U)
    xU[0::U] = x
    return xU


###############################################################################
# Polyphase filter
###############################################################################

class PolyPhaseFirFilterStructure:
    """Polyphase FIR filter structure (PFS) per block of data

    Input:
    . Nphases: number of poly phases, is data block size, is number of rows
      (axis=0) in polyphase structure
    . coefs: FIR coefficients b[0 : Nphases * Ntaps - 1]
    . commutator: 'up' or 'down', commutator 'up' selects the polyCoefs from
        phase 0 : Nphases - 1, commutator 'down' first flips the phases order
        in polyCoefs.

    Derived:
    . Ntaps: number of taps per poly phase, is number of columns (axis=1) in
      the polyphase structure

    Map LPF FIR coefs to polyCoefs with:
    . flip rows (axis=0) to have first coefs[0] at last row to fit time order
      of data block with newest sample at last index. Flipping polyCoefs once
      here at init is probably more efficient, then flipping inData and outData
      for every block.

    Filter delay memory polyDelays:
    . Shift in data blocks per column at column 0, last sample in data block
      and in column is newest.
    """
    def __init__(self, Nphases, coefs, commutator):
        self.coefs = coefs
        self.Ncoefs = len(coefs)
        self.Nphases = Nphases  # branches, rows
        self.Ntaps = ceil_div(self.Ncoefs, Nphases)  # taps, columns

        # Apply polyCoefs branches in range(Nphases) order, because coefs
        # mapping in poly_coeffs(commutator) already accounts for down or up.
        self.polyCoefs = self.poly_coeffs(commutator)

        #             oldest sample[0]
        #             newest sample [15]
        #   samples  [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
        #   blocks   [         0,          1,            2,              3]
        #             shift blocks to next column in polyDelays
        #             put newest block[3] in first column of polyDelays
        #   polyDelays = [[ 8,  4,  0],
        #                 [ 9,  5,  1],
        #                 [10,  6,  2],
        #                 [11,  7,  3]])
        self.polyDelays = np.zeros((Nphases, self.Ntaps))

    def poly_coeffs(self, commutator):
        """Polyphase structure for FIR filter coefficients coefs in Nphases

        Input:
        . coefs = prototype FIR filter coefficients (impulse response)
        . commutator: 'up' or 'down'
        Return:
        . polyCoefs:
          . E.g. coefs = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
          . If Ncoefs < Ntaps * Nphases, then pad polyCoefs with zeros.
          . Branch order of polyCoefs accounts for commutator, therefore the
            polyCoefs branches can be applied in range(Nphases) order
            independent of commutator direction.
          . commutator: 'up', first output appears when the switch is at the
            branch p = 0 and then yields a new output for every branch,
            therefore p = 0 first in polyCoefs.

              polyCoefs = [[ 0,  4,  8],   # p = 0, H0(z)
                           [ 1,  5,  9],   # p = 1, H1(z)
                           [ 2,  6, 10],   # p = 2, H2(z)
                           [ 3,  7, 11]])  # p = 3, H3(z),
                                             Nphases = 4 rows (axis=0)
                                             Ntaps = 3 columns (axis=1)

          . commutator: 'down', new FIR sum output starts being aggregated when
            switch is at branch p = Nphases-1, therefore p = Nphases - 1 first
            in polyCoefs

              polyCoefs = [[ 3,  7, 11],   # p = 3, H3(z)
                           [ 2,  6, 10],   # p = 2, H2(z)
                           [ 1,  5,  9],   # p = 1, H1(z)
                           [ 0,  4,  8]])  # p = 0, H0(z)
                                             Nphases = 4 rows (axis=0)
                                             Ntaps = 3 columns (axis=1)
        """
        polyCoefs = np.zeros(self.Nphases * self.Ntaps)
        polyCoefs[0 : self.Ncoefs] = self.coefs
        polyCoefs = polyCoefs.reshape(self.Ntaps, self.Nphases).T
        if commutator == 'down':
            polyCoefs = np.flip(polyCoefs, axis=0)
        return polyCoefs

    def filter_block(self, inData):
        """Filter block of inData per poly phase

        Input:
        . inData: block of Nphases input data, time index n increments, so
            inData[0] is oldest data and inData[Nphases-1] is newest data.
        Return:
        . outData: block of Nphases filtered output data with outData[0] is
            oldest data and outData[Nphases-1] is newest data, so time index n
            increments, like with inData.
        """
        # Shift delay memory by one block
        self.polyDelays = np.roll(self.polyDelays, 1, axis=1)
        # Shift in inData block at column 0
        self.polyDelays[:, 0] = inData
        # Apply FIR coefs per element
        zData = self.polyDelays * self.polyCoefs
        # Sum FIR taps per poly phase
        outData = np.sum(zData, axis=1)
        return outData


def poly_structure_size_for_downsampling_whole_x(Lx, Ndown):
    """Polyphase structure size for downsampling a signal x with Lx samples

    The polyphase structure has Ndown branches and branch size suitable to use lfilter() once per polyphase branch
    for whole signal x. Instead of using pfs.polyDelays per pfs.Ntaps.
    . Prepend x with Ndown - 1 zeros to have first down sampled sample at m = 0 start at n = 0.
    . Skip any remaining last samples from x, that are not enough yield a new output FIR sum.

    Input:
    . Lx: len(x)
    . Ndown: downsample rate
    Return:
    . Nx: Total number of samples from x, including prepended Ndown - 1 zeros.
    . Nxp: Total number of samples used from x per polyphase branch, is Ny the number of samples that will be in
           downsampled output y[m] = x[m D] for m = 0, 1, 2, ..., Nxp - 1
    """
    # Numer of samples per polyphase
    Nxp = (Ndown - 1 + Lx) // Ndown
    # Used number of samples from x
    Nx = 1 + Ndown * (Nxp - 1)
    return Nx, Nxp


def poly_structure_data_for_downsampling_whole_x(x, Ndown):
    """Polyphase data structure for whole signal x

    . see poly_structure_size_for_downsampling_whole_x()

    Input:
    . x: input samples
    Return:
    . polyX: polyphase data structure with size (Ndown, Nxp) for Ndown branches and Nxp samples from x per branch.
    """
    Lx = len(x)
    Nx, Nxp = poly_structure_size_for_downsampling_whole_x(Lx, Ndown)

    polyX = np.zeros(Ndown * Nxp)
    polyX[Ndown - 1] = x[0]  # prepend x with Ndown - 1 zeros
    polyX[Ndown:] = x[1 : Nx]
    polyX = polyX.reshape(Nxp, Ndown).T
    return polyX, Nx, Nxp


Eric Kooistra's avatar
Eric Kooistra committed
def upsample(x, Nup, coefs, verify=False, verbosity=1):  # interpolate
    """Upsample x by factor U = Nup and LPF

    Input:
    . x: Input signal x[n]
    . Nup: upsample (interpolation) factor
    . coefs: prototype FIR filter coefficients for anti aliasing LPF
    . verify: when True then verify that output y is the same when calculated directly or when calculated using the
              polyphase implementation.
Eric Kooistra's avatar
Eric Kooistra committed
    - verbosity: when > 0 print() status, else no print()
    Return:
    . y: Upsampled output signal y[m]

    Assumptions:
    . x[n] = 0 for n < 0
    . m = n * U, so first upsampled output sample starts at m = 0 based on n = 0
    . insert U - 1 zeros after each x[n] to get v[m], so len(v) = len(y) = U * len(x)
    . use coefs as anti aliasing filter, must be LPF with BW < fNyquist / U
    . len(coefs) typically is multiple of Nup. If shorter, then the coefs are extended with zeros.

    Remarks:
    . The input sample period is ts and the output sample period of the upsampled (= interpolated signal) is tsUp =
      ts / U
    . The group delay is (Ncoefs - 1) / 2 * tsUp = (Ncoefs - 1) / U / 2 * ts. With odd Ncoefs and symmetrical coefs
      to have linear phase, the group delay is an integer number of tsUp periods, so:
      - when (Ncoefs - 1) %  2      == 0, then the group delay is an integer number of tsUp periods
      - when (Ncoefs - 1) % (2 * U) == 0, then the group delay is an integer number of ts periods

    Procedure:

      x[n]:   0           1           2           3 --> time n with unit Ts

      v[m] = x[m / U], for m = 0, +-U, +-2U, ...
      v[m]:   0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 --> time m with unit Ts / U
                                                  |  |  |  |
      h[k]:     11 10  9  8  7  6  5  4  3  2  1  0  |  |  | --> coefs for m = 12
                   11 10  9  8  7  6  5  4  3  2  1  0  |  | --> coefs for m = 13
                      11 10  9  8  7  6  5  4  3  2  1  0  | --> coefs for m = 14
                         11 10  9  8  7  6  5  4  3  2  1  0 --> coefs for m = 15

      y[m]:   0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 --> time m with unit Ts / U

            Calculate y[0, 1, 2, 3] at x[0]
              y[0] = h[0] x[0]
              y[1] = h[1] x[0]
              y[2] = h[2] x[0]
              y[3] = h[3] x[0]

            Calculate y[4, 5 ,6 ,7] at x[1]
              y[ 4] = h[0] x[1] + h[4] x[0]
              y[ 5] = h[1] x[1] + h[5] x[0]
              y[ 6] = h[2] x[1] + h[6] x[0]
              y[ 7] = h[3] x[1] + h[7] x[0]

            Calculate y[8, 9, 10, 11] at x[2]
              y[ 8] = h[0] x[2] + h[4] x[1] + h[ 8] x[0]
              y[ 9] = h[1] x[2] + h[5] x[1] + h[ 9] x[0]
              y[10] = h[2] x[2] + h[6] x[1] + h[10] x[0]
              y[11] = h[3] x[2] + h[7] x[1] + h[11] x[0]

            Calculate y[12, 13, 14, 15] at x[3], see '|' markers between v[m] (is zero padded x[n]) and h[k] above
              y[12] = h[0] x[3] + h[4] x[2] + h[ 8] x[1]
              y[13] = h[1] x[3] + h[5] x[2] + h[ 9] x[1]
              y[14] = h[2] x[3] + h[6] x[2] + h[10] x[1]
              y[15] = h[3] x[3] + h[7] x[2] + h[11] x[1]

        ==> Calculate y[n * U + [0, ..., U - 1]] at x[n]
              y[n * U + 0] = lfilter(h[0, 4, 8], [1], x)   # p = 0
              y[n * U + 1] = lfilter(h[1, 5, 9], [1], x)   # p = 1
              y[n * U + 2] = lfilter(h[2, 6,10], [1], x)   # p = 2
              y[n * U + 3] = lfilter(h[3, 7,11], [1], x)   # p = 3
    a = [1.0]  # FIR b = coefs, a = 1

    # Polyphase implementation to avoid multiply by zero values
    #   coefs = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
    #   polyCoefs = [[ 0,  4,  8],   # p = 0
    #                [ 1,  5,  9],   # p = 1
    #                [ 2,  6, 10],   # p = 2
    #                [ 3,  7, 11]])  # p = 3
    pfs = PolyPhaseFirFilterStructure(Nup, coefs, commutator='up')
    polyCoefs = pfs.polyCoefs

    # Poly phases for data
    # . Use polyX for whole data signal x with Nx samples, instead of pfs.polyDelays per pfs.Ntaps, to be able to use
    #   signal.lfilter() per branch for whole data signal x.
    # Filter x per polyphase, index p = 0, 1, .., Nup - 1, order in polyCoefs already accounts for the commutator
    # direction [LYONS Fig 10.13, 10.23, 10.23 seems to have commutator arrows in wrong direction], [CROCHIERE Fig
    # 3.25], [HARRIS 7.11, 7.12]
    for p in range(Nup):
        polyY[p] = signal.lfilter(polyCoefs[p], a, x)

    # Output Nup samples for every input sample
    y = polyY.T.reshape(1, Nup * Nx)[0]

    if verify:
        # Inefficient upsampling implementation with multiply by zero values
        xZeros = np.zeros((Nup, Nx))
        xZeros[0] = x
        xZeros = xZeros.T.reshape(1, Nup * Nx)[0]  # upsample
        yVerify = signal.lfilter(coefs, a, xZeros)  # LPF
        if np.allclose(y, yVerify, rtol=c_rtol, atol=c_atol):
            print('PASSED: correct upsample result')
        else:
            print('ERROR: wrong upsample result')

Eric Kooistra's avatar
Eric Kooistra committed
    if verbosity:
        print('> upsample():')
        print('. Nup    =', str(Nup))
        print('. Nx     =', str(Nx))
        print('. len(y) =', str(len(y)))
Eric Kooistra's avatar
Eric Kooistra committed
        print('')
Eric Kooistra's avatar
Eric Kooistra committed
def downsample(x, Ndown, coefs, verify=False, verbosity=1):  # decimate
    """LPF and downsample x by factor D = Ndown

    Input:
    . x: Input signal x[n]
    . Ndown: downsample (decimation) factor
    . coefs: prototype FIR filter coefficients for anti aliasing LPF
    . verify: when True then verify that output y is the same when calculated directly or when calculated using the
              polyphase implementation.
Eric Kooistra's avatar
Eric Kooistra committed
    - verbosity: when > 0 print() status, else no print()
    Return:
    . y: Downsampled output signal y[m]

    Assumptions:
    . x[n] = 0 for n < 0
    . m = n // D, so first downsampled output sample starts at m = 0 based on n = 0
    . no y[m] for m < 0
    . skip D - 1 values zeros after each x[n] to get y[m], so len(y) = len(x) // D
    . use coefs as anti aliasing filter, must be LPF with BW < fNyquist / D
    . len(coefs) typically is multiple of Ndown. If shorter, then the coefs are extended with zeros.

    Remarks:
    . The input sample period is ts and the output sample period of the downsampled (= decimated signal) is tsDown =
      ts * D
    . The group delay is (Ncoefs - 1) / 2 * ts. With odd Ncoefs and symmetrical coefs to have linear phase, the group
      delay is an integer number of ts periods, so:
      - when (Ncoefs - 1) %  2      == 0, then the group delay is an integer number of ts periods
      - when (Ncoefs - 1) % (2 * D) == 0, then the group delay is an integer number of tsDown periods

    Procedure:

      x[n]:   0  1  2  3  4  5  6  7  8  9 10 11 12  --> time n with unit Ts
                                                  |
      h[k]:     11 10  9  8  7  6  5  4  3  2  1  0  --> coefs for y[m] at m = 3

                          8           4           0  --> coefs for v[n] at n = 12
                       9           5           1     --> coefs for v[n] at n = 11
                   10           6           2        --> coefs for v[n] at n = 10
                11           7           3           --> coefs for v[n] at n =  9

      v[n]:   0  1  2  3  4  5  6  7  8  9 10 11 12

      y[m]:   0           1           2           3  --> time m with unit Ts * D

            Remove    v[-3] = h[3] x[-3]                          at x[-3]
            Remove    v[-2] = h[2] x[-2]                          at x[-2]
            Remove    v[-1] = h[1] x[-1]                          at x[-1]
            Calculate v[ 0] = h[0] x[ 0]                          at x[ 0]

            Calculate v[ 1] = h[3] x[ 1]                          at x[ 1]
            Calculate v[ 2] = h[2] x[ 2]                          at x[ 2]
            Calculate v[ 3] = h[1] x[ 3]                          at x[ 3]
            Calculate v[ 4] = h[0] x[ 4] + h[4] x[0]              at x[ 4]

            Calculate v[ 5] = h[3] x[ 5] + h[7] x[1]              at x[ 5]
            Calculate v[ 6] = h[2] x[ 6] + h[6] x[2]              at x[ 6]
            Calculate v[ 7] = h[1] x[ 7] + h[5] x[3]              at x[ 7]
            Calculate v[ 8] = h[0] x[ 8] + h[4] x[4] + h[ 8] x[0] at x[ 8]

            Calculate v[ 9] = h[3] x[ 9] + h[7] x[5] + h[11] x[1] at x[ 9]
            Calculate v[10] = h[2] x[10] + h[6] x[6] + h[10] x[2] at x[10]
            Calculate v[11] = h[1] x[11] + h[5] x[7] + h[ 9] x[3] at x[11]
            Calculate v[12] = h[0] x[12] + h[4] x[8] + h[ 8] x[4] at x[12]

        ==> Calculate y[m] at sum v[n - p] at x[n] for n = m * D and p = 3, 2, 1, 0

                       Ndown-1
              y[m] =     sum   v[m * D - p]
                        p = 0

                   with:
                                [n - p]                                 [p]
                               v[n - 3] = lfilter(h[3, 7,11], [1], polyX[0])
                               v[n - 2] = lfilter(h[2, 6,10], [1], polyX[1])
                               v[n - 1] = lfilter(h[1, 5, 9], [1], polyX[2])
                               v[n - 0] = lfilter(h[0, 4, 8], [1], polyX[3])
    """
    a = [1.0]  # FIR b = coefs, a = 1

    # Polyphase implementation to avoid calculating values that are removed
    #   coefs = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
    #   polyCoefs = [[ 3,  7, 11],
    #                [ 2,  6, 10],
    #                [ 1,  5,  9],
    #                [ 0,  4,  8]])
    pfs = PolyPhaseFirFilterStructure(Ndown, coefs, commutator='down')
    polyCoefs = pfs.polyCoefs

    # Poly phases for whole data signal x, prepended with Ndown - 1 zeros
    polyX, Nx, Nxp = poly_structure_data_for_downsampling_whole_x(x, Ndown)  # size (Ndown, Nxp), Ny = Nxp
    # Filter x per polyphase, index p = 0, 1, .., Ndown - 1, the row order in polyCoefs already accounts for the
    # commutator direction. [HARRIS Fig 6.12, 6.13], [LYONS Fig 10.14, 10.22], [CROCHIERE Fig 3.29]
    polyY = np.zeros((Ndown, Nxp))
    for p in range(Ndown):
        polyY[p] = signal.lfilter(polyCoefs[p], a, polyX[p])

    # Sum the branch outputs to get single downsampled output value
        # Inefficient downsampling implementation with calculating values that are removed
        yVerify = np.zeros(Ndown * Nxp)
        yVerify[0 : Nx] = signal.lfilter(coefs, a, x[0 : Nx])  # LPF
        yVerify = yVerify.reshape(Nxp, Ndown).T   # poly phases
        yVerify = yVerify[0]   # downsample
        if np.allclose(y, yVerify, rtol=c_rtol, atol=c_atol):
            print('PASSED: correct downsample result')
        else:
            print('ERROR: wrong downsample result')

Eric Kooistra's avatar
Eric Kooistra committed
    if verbosity:
        print('> downsample():')
        print('. len(x) =', str(len(x)))
        print('. Ndown  =', str(Ndown))
        print('. Nx     =', str(Nx))
        print('. Nxp    =', str(Nxp))
        print('. len(y) =', str(len(y)))  # = Nxp
        print('')
    return y


def downsample_bpf(x, Ndown, k, Ndft, coefs, verbosity=1):
    """BPF at bin k of Ndft and downsample x by factor D = Ndown [HARRIS Fig 6.14]

    Implement downsampling down converter for one bin [HARRIS Fig 6.14].

    . see downsample()

    Input:
    . x: Input signal x[n]
    . Ndown: downsample (decimation) factor
    . k: Index of BPF center frequency w_k = 2 pi k / Ndft
    . Ndft: number of frequency bins, DFT size
    . coefs: prototype FIR filter coefficients for anti aliasing BPF
    - verbosity: when > 0 print() status, else no print()
    Return:
    . y: Downsampled and down converted output signal y[m], m = n // D for bin
         k. Complex baseband signal.
    """
    a = [1.0]  # FIR b = coefs, a = 1

    # Polyphase implementation to avoid calculating values that are removed
    #   coefs = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
    #   polyCoefs = [[ 3,  7, 11],
    #                [ 2,  6, 10],
    #                [ 1,  5,  9],
    #                [ 0,  4,  8]])
    pfs = PolyPhaseFirFilterStructure(Ndown, coefs, commutator='down')
    polyCoefs = pfs.polyCoefs

    # Poly phases for whole data signal x, prepended with Ndown - 1 zeros
    polyX, Nx, Nxp = poly_structure_data_for_downsampling_whole_x(x, Ndown)  # size (Ndown, Nxp), Ny = Nxp

    # Filter x per polyphase, order in polyCoefs accounts for commutator [HARRIS Fig 6.12, 6.13]
    polyY = np.zeros((Ndown, Nxp))
    for p in range(Ndown):
        polyY[p] = signal.lfilter(polyCoefs[p], a, polyX[p])

    # Phase rotate per polyphase, due to delay line at branch inputs [HARRIS Eq 6.8]
    polyYC = np.zeros((Ndown, Nxp), dtype='cfloat')
    for p in range(Ndown):
        polyYC[p] = polyY[p] * np.exp(1j * 2 * np.pi * (Ndown - 1 - p) * k / Ndown)

    # Sum the branch outputs to get single downsampled and downconverted output value
    y = np.sum(polyYC, axis=0)

    if verbosity:
        print('> downsample_bpf():')
        print('. len(x) =', str(len(x)))
        print('. Ndown  =', str(Ndown))
        print('. Nx     =', str(Nx))
        print('. Nxp    =', str(Nxp))
        print('. len(y) =', str(len(y)))  # = Nxp
        print('. Ndft   =', str(Ndft))
        print('. k      =', str(k))
Eric Kooistra's avatar
Eric Kooistra committed
        print('')
Eric Kooistra's avatar
Eric Kooistra committed
def resample(x, Nup, Ndown, coefs, verify=False, verbosity=1):  # interpolate and decimate by Nup / Ndown
    """Resample x by factor U / D = Nup / Ndown

    x[n] --> Nup --> v[m] --> LPF --> w[m] --> Ndown --> y[k]

    The w[m] is the result of upsampling x[n] by Nup. The LPF has narrowed the pass band to suit both Nup and Ndown.
    Therefore the downsampling by Ndown implies that only one in every Ndown samples of w[m] needs to be calculated
    to get y[k] = w[k * D]. Hence resampling by Nup / Ndown takes less processing than upsampling when Ndown > 1.

    Poly phases in up and downsampling:
    The upsampling structure outputs the interpolated samples via the separate output phases. The same input sequence
    x is used to calculate all Nup phases, whereby each phase uses a different set of coefficients from the LPF.
    The downsampling structure sums the Ndown phases to output the decimated samples. Each phase uses a different set
    of coefficients from the LPF to filter Ndown delay phases of the input sequence x.

    Resampling is upsampling with downsampling by phase selection
    The resampling is done by first upsampling and then downsampling, because then only one shareed LPF is needed.
    For upsampling an LPF is always needed, because it has to construct the inserted Nup - 1 zero values. For
    downsampling the LPF of the upsampling already has restricted the input band width (BW), provided that the LPF
    has BW < fNyquist / U and BW < fNyquist / D. The downsampling therefore does not need an LPF and reduces to
    selecting appropriate output phase of the upsampler. The upsampler then only needs to calculate the output phase
    that will be used by the downsampler, instead of calculating all output phases.

    Input:
    . x: Input signal x[n]
    . Nup: upsample (interpolation) factor
    . Ndown: downsample (decimation) factor
    . coefs: FIR filter coefficients for anti aliasing LPF
    . verify: when True then verify that output y is the same when calculated directly or when calculated using the
              polyphase implementation.
Eric Kooistra's avatar
Eric Kooistra committed
    - verbosity: when > 0 print() status, else no print()
    Return:
    . y: Resampled output signal y[m]

    Assumptions:
    . x[n] = 0 for n < 0
    . k = m // D = (n * U) // D, so first resampled output sample starts at k = 0 based on n = 0
    . insert U - 1 zeros after each x[n] to get v[m], so len(v) = len(y) = U * len(x)
    . use coefs as anti aliasing filter, must be LPF with BW < fNyquist / U and BW < fNyquist / D
    . len(coefs) typically is multiple of Nup. If shorter, then the coefs are extended with zeros.

    Remarks:
    . The group delay is the same as for the upsampler, because the downsampling has no additional filtering.
    """
    Nx = len(x)
    Nv = Nup * Nx
    Nyp = (Nv + Ndown - 1) // Ndown   # Number of samples from v, per poly phase in polyY
    Ny = 1 + Ndown * (Nyp - 1)  # Used number of samples from v

    # Upsampling with LPF
    w = upsample(x, Nup, coefs, verify=False)
    # Downsampling selector
    # This model is not efficient, because the upsample() has calculated all Nup phases, whereas it only needs to
    # calculate the phase that is selected by the downsampling. However, it is considered not worth the effort to
    # make the model more efficient, because then only parts of lfilter() in upsample() need to be calulated, so
    # then lfilter() needs to be implemented manually and possibly in a for loop [section 3.3.4 CROCHIERE].
    downSelect = np.arange(0, Nyp) * Ndown
    y = w[downSelect]

    if verify:
        # Inefficient implementation
        # . Upsampling with multiply by zero values
        v = np.zeros((Nup, Nx))   # poly phases
        v[0] = x
        v = v.T.reshape(1, Nup * Nx)[0]  # upsample
        # . LPF
        a = [1.0]  # FIR b = coefs, a = 1
        w = signal.lfilter(coefs, a, v)
        # . Downsampling with calculating values that are removed
        yVerify = np.zeros(Ndown * Nyp)
        yVerify[0 : Ny] = w[0 : Ny]
        yVerify = yVerify.reshape(Nyp, Ndown).T   # poly phases
        yVerify = yVerify[0]   # downsample

        if np.allclose(y, yVerify, rtol=c_rtol, atol=c_atol):
            print('PASSED: correct resample result')
        else:
            print('ERROR: wrong resample result')

Eric Kooistra's avatar
Eric Kooistra committed
    if verbosity:
        print('> resample():')
        print('. len(x) =', str(len(x)))
        print('. Nx     =', str(Nx))
        print('. len(v) =', str(len(v)))
        print('. Ny     =', str(Ny))
        print('. Nyp    =', str(Nyp))
        print('. len(y) =', str(len(y)))
Eric Kooistra's avatar
Eric Kooistra committed
        print('')