Skip to content
Snippets Groups Projects
Select Git revision
  • 8e7e0e6e96865deb62b884d21b3cdeb0d44a6431
  • master default protected
  • image_support_for_boolean
  • image_support_lofar_fixes
  • image_support
  • moved-to-gitlab
  • remove-libpqxx-submodule
  • v0.11.2
  • v0.11.1
  • v0.11.0
  • v0.10.0
  • v0.9.1
  • v0.9.0
13 results

AttributeTraits.hpp

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    multirate.py 27.14 KiB
    #! /usr/bin/env python3
    ###############################################################################
    #
    # Copyright 2024
    # ASTRON (Netherlands Institute for Radio Astronomy) <http://www.astron.nl/>
    # P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    # http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    ###############################################################################
    
    # Author: Eric Kooistra
    # Purpose: Multirate functions for DSP
    # Description:
    #
    # Usage:
    # . Use [2] to verify this code.
    #
    # References:
    # [1] dsp_study_erko.txt
    # [2] pfb_os/up_downsampling.ipynb, pfb_os/multirate_mixer.ipynb
    #
    # Books:
    # . HARRIS 6
    # . LYONS
    # . CROCHIERE
    
    import numpy as np
    from sys import exit
    from scipy import signal
    from .utilities import c_rtol, c_atol, ceil_div
    
    c_firA = [1.0]  # FIR b = coefs, a = 1
    
    
    def zeros(shape, cmplx=False):
        if cmplx:
            return np.zeros(shape, dtype='cfloat')
        else:
            return np.zeros(shape)
    
    
    def ones(shape, cmplx=False):
        return zeros(shape, cmplx) + 1
    
    
    def fold(x):
        """Circular reverse sequence around index 0.
    
        x[n] --> flip(x) --> x[N - n - 1]
        x[n] --> fold(x) -> x[-n] = x[N - n] = roll(flip(x), 1)
    
        The index is modulo N, so x[N - 0] = x[N] = x[0].
        """
        return np.roll(np.flip(x), 1)
    
    
    def down(x, D, phase=0):
        """Downsample x[n] by factor D, xD[m] = x[m D], m = n // D
    
        Keep every D-th sample in x and skip every D-1 samples after that sample.
        """
        xD = x[phase::D]
        return xD
    
    
    def up(x, U):
        """Upsample x by factor U, xU[m] = x[n] when m = n U, else 0.
    
        After every sample in x insert U-1 zeros.
        """
        xU = zeros(len(x) * U, np.iscomplexobj(x))
        xU[0::U] = x
        return xU
    
    
    ###############################################################################
    # Polyphase filter structure for input block processing
    ###############################################################################
    
    class PolyPhaseFirFilterStructure:
        """Polyphase FIR filter structure (PFS) per block of data
    
        Purpose of PFS implementation is to avoid multiply by zero values in case
        of upsampling and to avoid calculating samples that will be discarded in
        case of downsampling. The output result of the PFS FIR is identical to
        using the FIR filter. The spectral aliasing and spectral replication are
        due to the sample rate change, not to the implementation structure.
    
        The FIR data can be real to process real data, or complex to process
        equivalent baseband data. The FIR coefficients are real.
    
        The PFS is is suitable for downsampling and for upsampling:
        - Upsampling uses the PFS as Transposed Direct-Form FIR filter, where the
          commutator selects the polyphases from phase 0 to Nphases - 1 to yield
          output samples for every input sample [HARRIS Fig 7.11, 7.12, CROCHIERE
          Fig 3.25].
        - Downsampling uses the PFS as a Direct-Form FIR filter, where the
          commutator selects the polyphases from phase Nphases - 1 downto 0 to sum
          samples from the past for one output sample [HARRIS Fig 6.12, 6.13,
          CROCHIERE Fig 3.29].
        - In both cases the commutator rotates counter clockwise, because the PFS
          uses a delay line of z^(-1) called type 1 structure [CROCHIERE 3.3.3,
          VAIDYANATHAN 4.3].
    
        Input:
        . coefs: Real FIR coefficients b[0 : Nphases * Ntaps - 1].
        . Nphases: number of polyphases, is number of rows (axis=0) in the
                   polyphase structure.
        . cmplx: Define PFS delays for real (when False) or complex (when True)
                 FIR data.
        Derived:
        . Ntaps: number of taps per polyphase, is number of columns (axis=1) in
          the polyphase structure.
        . polyCoefs: FIR coefficients storage with Nphases rows and Ntaps columns.
        . polyDelays: FIR taps storage with Nphases rows and Ntaps columns.
    
        Storage of FIR coefficients and FIR data:
          Stream of input samples x[n], n >= 0 for increasing time. Show index n
          also as value so x[n] = n:
    
                     oldest sample                             newest sample
            x         v                                                  v
            samples  [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, ...]
            blocks   [         0,          1,            2,              3]
                                                                         ^
                                                               newest block
    
          The delayLine index corresponds to the FIR coefficient index k of
          b[k] = impulse response h[k]. For Ncoefs = 12, with Nphases = 4 rows and
          Ntaps = 3 columns:
    
            polyCoefs = [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9,10,11] = b[k]
                                     row = polyphase
            polyCoefs = [[0, 4, 8],    0
                         [1, 5, 9],    1
                         [2, 6,10],    2
                         [3, 7,11]]    3
                 column:  0, 1, 2
    
          Shift x[n] samples into delayLine from left, show sample index n as value
          in round brackets (n) and delayLine index k in [k] with square brackets.
          Shift in per sample or per block. The block can also be as large as the
          entire delayLine when the coefs are used as window.
    
            shift in -->(15,14,13,12,11,10, 9, 8, 7, 6, 5, 4) --> shift out
            delayLine = [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9,10,11]
    
          For polyDelays shift in from left-top (n = 15 is newest input sample)
          and shift out from right-bottom (n = 4 is oldest sample).
    
                           v              v
            polyDelays = [[0, 4, 8],   ((15,11, 7),
                          [1, 5, 9],    (14,10, 6),
                          [2, 6,10],    (13, 9, 5),
                          [3, 7,11]]    (12, 8, 4))
                                 v              v
    
          Output sample y[n] = sum(polyCoefs * polyDelays).
    
          For downsampling by Ndown = Nphases shift blocks of Ndown samples in
          from left in polyDelays, put newest block[3] = x[12,13,14,15] in first
          column [0] of polyDelays.
    
          Store input samples in polyDelays structure, use mapping to access as
          delayLine:
            . map_to_delay_line()
            . map_to_poly_delays()
        """
        def __init__(self, Nphases, coefs, cmplx=False):
            self.coefs = coefs
            self.Ncoefs = len(coefs)
            self.Nphases = Nphases  # branches, rows
            self.Ntaps = ceil_div(self.Ncoefs, Nphases)  # taps, columns
            self.Ndelays = self.Ntaps * self.Nphases  # zero padded coefs
            self.cmplx = cmplx
            self.init_poly_coeffs()
            self.reset_poly_delays()
    
        def init_poly_coeffs(self):
            """Polyphase structure for FIR filter coefficients coefs in Nphases.
    
            Input:
            . coefs = prototype FIR filter coefficients (= b[k] = impulse response)
            Return:
            . If Ncoefs < Nphases * Ntaps, then pad coefs with zeros.
            . E.g. coefs = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
                   coefs.reshape((4, 3)) = [[0, 1, 2],
                                            [3, 4, 5],
                                            [6, 7, 8],
                                            [9,10,11])
            . Therefore for Nphases = 4 rows and Ntaps = 3 columns use:
              polyCoefs = coefs.reshape((3, 4)).T
                        = [[0, 4, 8],    # p = 0, H0(z)
                           [1, 5, 9],    # p = 1, H1(z)
                           [2, 6,10],    # p = 2, H2(z)
                           [3, 7,11])    # p = 3, H3(z),
                                           Nphases = 4 rows (axis=0)
                                           Ntaps = 3 columns (axis=1)
            """
            coefsLine = np.zeros(self.Ndelays)  # real coefficients values
            coefsLine[0 : self.Ncoefs] = self.coefs
            self.polyCoefs = coefsLine.reshape((self.Ntaps, self.Nphases)).T
    
        def reset_poly_delays(self):
            self.polyDelays = zeros((self.Nphases, self.Ntaps), self.cmplx)
    
        def map_to_delay_line(self, polyDelays=None):
            if polyDelays is None:
                delayLine = self.polyDelays.T.reshape(-1)
            else:
                delayLine = polyDelays.T.reshape(-1)
            return delayLine
    
        def map_to_poly_delays(self, delayLine):
            self.polyDelays = delayLine.reshape((self.Ntaps, self.Nphases)).T
    
        def shift_in_data(self, inData):
            """Shift block of data into the polyDelays structure.
    
            View polyDelays as delay line if L < Nphases. Shift in from the left
            in the delay line, so last (= newest) sample in inData will appear at
            most left of the delay line. If L = Nphases, then it is possible to
            shift the data directly into the first (= left) column of polyDelays.
    
            Input:
            . inData: Block of one or more input samples with index as time index
                n in inData[n], so oldest sample at index 0 and newest sample at
                index -1.
            """
            L = len(inData)
            if L != self.Nphases:
                delayLine = self.map_to_delay_line()
                # Equivalent code:
                # delayLine = np.concatenate((inData, delayLine[L:]))
                delayLine = np.roll(delayLine, L)
                delayLine[:L] = inData
                self.map_to_poly_delays(delayLine)
            else:
                # Equivalent code for L == Nphases: Shift in inData block directly
                # at column 0
                self.polyDelays = np.roll(self.polyDelays, 1, axis=1)
                self.polyDelays[:, 0] = inData
    
        def parallel_load(self, inData):
            """Load block of data into each tap of the polyDelays structure.
    
            Length L of inData is L = Nphases, so load the data directly at
            each column of polyDelays.
    
            Input:
            . inData: Block of input samples with index as time index n in
                inData[n], so oldest sample at index 0 and newest sample at
                index -1.
            """
            for tap in range(self.Ntaps):
                self.polyDelays[:, tap] = inData
    
        def filter_block(self, inData):
            """Filter block of inData per polyphase.
    
            For upsampling the inData should contain Nphases = Nup copies of the
            input time sample.
            For downsampling the inData should contain Nphases = Ndown input time
            samples.
    
            Input:
            . inData: block of input data with length L. The time index n in
                inData[n] increments, so inData[0] is oldest data and inData[-1]
                is newest data.
            Return:
            . pfsData: block of polyphase FIR filtered output data for Nphases,
                with pfsData[p] and p = 0:Nphases-1 from top to bottom.
            """
            # Shift in one block of input data
            self.shift_in_data(inData)
            # Apply FIR coefs per delay element
            zPoly = self.polyDelays * self.polyCoefs
            # Sum FIR taps per polyphase
            pfsData = np.sum(zPoly, axis=1)
            # Output block of Nphases filtered output data
            return pfsData
    
    
    ###############################################################################
    # Up, down, resample low pass input signal
    ###############################################################################
    
    def polyphase_data_for_downsampling_whole_x(x, Ndown, Nzeros):
        """Polyphase data structure for downsampling whole signal x.
    
        The polyphase structure has Ndown branches and branch size suitable to use
        lfilter() once per polyphase branch for whole signal x. Instead of using
        pfs.polyDelays per pfs.Ntaps.
        . Prepend x with Nzeros = Ndown - 1 zeros to have first down sampled sample
          at m = 0 start at n = 0.
        . Skip any remaining last samples from x, that are not enough to yield a
          new output FIR sum.
    
        Input:
        . x: all input samples x[n] for n = 0 : Lx - 1
        . Ndown: downsample rate and number of polyphase branches
        . Nzeros: number of zero samples to prepend for x
        Return:
        . polyX: polyphase data structure with size (Ndown, Nxp) for Ndown branches
            and Nxp samples from x per branch.
        . lineX: Nx samples from x prepended with Nzeros, same content as polyX
        . Nx: Total number of samples from x, including prepended Ndown - 1 zeros.
        . Nxp: Total number of samples used from x per polyphase branch, is the
            number of samples Ny, that will be in downsampled output y[m] = x[m D],
            for m = 0, 1, 2, ..., Nxp - 1.
        """
        Lx = len(x)
        Nxp = (Nzeros + Lx) // Ndown  # Number of samples per polyphase
        Nx = Ndown * Nxp - Nzeros  # Used number of samples from x
    
        # Load x into polyX with Ndown rows = polyphases
        # . prepend x with Nzeros zeros
        # . skip any last remaining samples from x, that are not enough yield a new
        #   output FIR sum.
        lineX = zeros(Ndown * Nxp, np.iscomplexobj(x))
        lineX[Nzeros:] = x[0 : Nx]
        # . Store data in time order per branch, so with oldest data left, to match
        #   use with lfilter(). Note this differs from order of tap data in
        #   pfs.polyDelays where newest data is left, because the block data is
        #   flipped by shift_in_data(), to match use in pfs.filter_block(), so that
        #   it can use pfs.polyDelays * pfs.polyCoefs to filter the block.
        # . The newest sample x[-1] has to be in top branch of polyX and the oldest
        #   sample x[0] in bottom branch, to match branch order of 0:Ndown-1 from
        #   top to bottom in the pfs.polyCoefs, therefore do flipud(polyX). This
        #   flipud() is similar as the flip() per block in shift_in_data().
        #
        #         x[0] is oldest                              x[-1] is newest
        #           |                                            |
        #   x[n] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9,10,11,12,13,14,15]
        #          (0, 1, 2, 3, 4, 5, 6, 7, 8, 9,10,11,12,13,14,15), x[n] = n
        #                                                              newest
        #   poly                 poly        v                             |
        #   Coefs = [[0, 4, 8],  Delays = ((15,11, 7),  polyX = ((3, 7,11,15),
        #            [1, 5, 9],            (14,10, 6),           (2, 6,10,14),
        #            [2, 6,10],            (13, 9, 5),           (1, 5, 9,13),
        #            [3, 7,11]]            (12, 8, 4))           (0, 4, 8,12))
        #                                          v              |
        #                                                       oldest
        polyX = np.flipud(lineX.reshape(Nxp, Ndown).T)
        return polyX, lineX, Nx, Nxp
    
    
    def polyphase_frontend(x, Nphases, coefs, sampling):
        """Calculate PFS FIR filter output of x for Nphases.
    
        The polyY[Nphases] output of this PFS frontend can be:
        . summed for dowsampling by factor D = Nphases
        . used as is for upsampling by factor U = Nphases
        . used with a range of Nphases LOs for a single channel downsampler
          downconverter, or upsampler upconverter
        . used with a IDFT for a analysis filterbank (PFB), or synthesis PFB.
    
        Input:
        . x: Input signal x[n]
        . Nphases: number of polyphase branches in PFS
        . coefs: prototype FIR filter coefficients for anti aliasing LPF. The
            len(coefs) typically is multiple of Nphases. If shorter, then the
            coefs are extended with zeros.
        . sampling: 'up' or 'down'
        Return:
        . polyY[Nphases]: Output of the FIR filtered branches in the PFS.
        . Nx = np.size(polyY), total number of samples from x
        . Nxp = np.size(polyY, axis=1), total number of samples from x per
            polyphase.
        """
        # Use polyphase FIR filter coefficients from pfs class.
        pfs = PolyPhaseFirFilterStructure(Nphases, coefs, np.iscomplexobj(x))
    
        # Define polyphases for whole data signal x with Nx samples, instead of
        # using polyDelays per Ntaps from pfs class, to be able to use
        # signal.lfilter() per polyphase branch for the whole data signal x.
        if sampling == 'up':
            Nx = len(x)
            Nxp = Nx
            Nup = Nphases
            # Filter whole x per polyphase, so Nxp = Nx, because the Nup branches
            # each use all x to yield Nup output samples per input sample from x.
            # The commutator index order for upsampling is p = 0, 1, .., Nup - 1,
            # so from top to bottom in the PFS.
            polyY = zeros((Nup, Nx), np.iscomplexobj(x))
    
            pCommutator = range(Nup)
            for p in pCommutator:
                polyY[p] = Nup * signal.lfilter(pfs.polyCoefs[p], c_firA, x)
        else:
            # 'down':
            # . Prepend x with Ndown - 1 zeros, to have y[m] for m = 0 start at
            #   n = 0 of x[n].
            # . Size of polyX is (Ndown, Nxp), and length of y is Ny = Nxp is
            #   length of each branch.
            Ndown = Nphases
            Nzeros = Ndown - 1
            polyX, _, Nx, Nxp = polyphase_data_for_downsampling_whole_x(x, Ndown, Nzeros)
            # print(polyX[:, 0])
            # Filter Ndown parts of x per polyphase, because the FIR filter output
            # y will sum. The commutator index order for downsampling is p =
            # Ndown - 1,..., 1, 0, so from bottom to top in the PFS. However, the
            # commutator index order is only relevant when the branches are
            # calculated sequentially to reuse the same hardware, because for the
            # output y the branches are summed anyway.
            polyY = zeros((Ndown, Nxp), np.iscomplexobj(x))
            pCommutator = np.flip(np.arange(Ndown))
            for p in pCommutator:
                polyY[p] = signal.lfilter(pfs.polyCoefs[p], c_firA, polyX[p])
        return polyY, Nx, Nxp
    
    
    def upsample(x, Nup, coefs, verify=False, verbosity=1):  # interpolate
        """Upsample x by factor U = Nup and LPF.
    
        Input:
        . x: Input signal x[n]
        . Nup: upsample (interpolation) factor
        . coefs: prototype FIR filter coefficients for anti aliasing LPF
        . verify: when True then verify that output y is the same when calculated
                  directly or when calculated using the polyphase implementation.
        - verbosity: when > 0 print() status, else no print()
        Return:
        . y: Upsampled output signal y[m]
    
        Assumptions:
        . x[n] = 0 for n < 0
        . m = n * U, so first upsampled output sample starts at m = 0 based on
          n = 0
        . no y[m] for m < 0
        . insert U - 1 zeros after each x[n] to get v[m], so len(v) = len(y) =
          U * len(x)
        . use coefs as anti aliasing filter, must be LPF with BW < fNyquist / U
        . len(coefs) typically is multiple of Nup. If shorter, then the coefs are
          extended with zeros.
    
        Remarks:
        . The input sample period is ts and the output sample period of the
          upsampled (= interpolated signal) is tsUp = ts / U
        . The group delay is (Ncoefs - 1) / 2 * tsUp = (Ncoefs - 1) / U / 2 * ts.
          With odd Ncoefs and symmetrical coefs to have linear phase, the group
          delay is an integer number of tsUp periods, so:
          - when (Ncoefs - 1) %  2 == 0, then the group delay is an integer number
            of tsUp periods
          - when (Ncoefs - 1) % (2 * U) == 0, then the group delay is an integer
            number of ts periods.
        """
        # Polyphase FIR filter input x
        polyY, Nx, _ = polyphase_frontend(x, Nup, coefs, 'up')
    
        # Output Nup samples y[m] for every input sample x[n]
        y = polyY.T.reshape(1, Nup * Nx)[0]
    
        if verify:
            # Inefficient upsampling implementation with multiply by zero values.
            # Verify that x --> U --> LPF --> y yields identical result y as with
            # using the PFS: x --> PFS FIR --> y.
            xZeros = zeros((Nup, Nx), np.iscomplexobj(x))
            xZeros[0] = x
            xZeros = xZeros.T.reshape(1, Nup * Nx)[0]  # upsample
            yVerify = Nup * signal.lfilter(coefs, c_firA, xZeros)  # LPF
            print('> Verify upsample():')
            if np.allclose(y, yVerify, rtol=c_rtol, atol=c_atol):
                print('  . PASSED: correct upsample result')
            else:
                exit('ERROR: wrong upsample result')
        if verbosity:
            print('> Log upsample():')
            print('  . Nup    =', str(Nup))
            print('  . Nx     =', str(Nx))
            print('  . len(y) =', str(len(y)))
            print('')
        return y
    
    
    def downsample(x, Ndown, coefs, verify=False, verbosity=1):  # decimate
        """LPF and downsample x by factor D = Ndown.
    
        Input:
        . x: Input signal x[n]
        . Ndown: downsample (decimation) factor
        . coefs: prototype FIR filter coefficients for anti aliasing LPF
        . verify: when True then verify that output y is the same when calculated
                  directly or when calculated using the polyphase implementation.
        - verbosity: when > 0 print() status, else no print()
        Return:
        . y: Downsampled output signal y[m]
    
        Assumptions:
        . x[n] = 0 for n < 0
        . m = n // D, so first downsampled output sample starts at m = 0 based on
          n = 0
        . no y[m] for m < 0
        . skip D - 1 values zeros after each x[n] to get y[m], so len(y) =
          len(x) // D
        . use coefs as anti aliasing filter, must be LPF with BW < fNyquist / D
        . len(coefs) typically is multiple of Ndown. If shorter, then the coefs
          are extended with zeros.
    
        Remarks:
        . The input sample period is ts and the output sample period of the
          downsampled (= decimated signal) is tsDown = ts * D
        . The group delay is (Ncoefs - 1) / 2 * ts. With odd Ncoefs and symmetrical
          coefs to have linear phase, the group delay is an integer number of ts
          periods, so:
          - when (Ncoefs - 1) %  2      == 0, then the group delay is an integer
            number of ts periods
          - when (Ncoefs - 1) % (2 * D) == 0, then the group delay is an integer
            number of tsDown periods
        """
        # Polyphase FIR filter input x
        polyY, Nx, Nxp = polyphase_frontend(x, Ndown, coefs, 'down')
    
        # FIR filter sum
        y = np.sum(polyY, axis=0)
    
        if verify:
            # Inefficient downsampling implementation with calculating values that
            # are removed. Verify that x --> LPF --> D --> y yields identical
            # result y as with using the PFS: x --> PFS FIR --> y.
            yVerify = zeros(Ndown * Nxp, np.iscomplexobj(x))
            yVerify[0 : Nx] = signal.lfilter(coefs, c_firA, x[0 : Nx])  # LPF
            yVerify = yVerify.reshape(Nxp, Ndown).T   # polyphases
            yVerify = yVerify[0]   # downsample by D
            print('> Verify downsample():')
            if np.allclose(y, yVerify, rtol=c_rtol, atol=c_atol):
                print('  . PASSED: correct downsample result')
            else:
                exit('ERROR: wrong downsample result')
        if verbosity:
            print('> Log downsample():')
            print('  . len(x) =', str(len(x)))
            print('  . Ndown  =', str(Ndown))
            print('  . Nx     =', str(Nx))
            print('  . Nxp    =', str(Nxp))
            print('  . len(y) =', str(len(y)))  # = Nxp
            print('')
        return y
    
    
    def resample(x, Nup, Ndown, coefs, verify=False, verbosity=1):  # interpolate and decimate by Nup / Ndown
        """Resample x by factor U / D = Nup / Ndown
    
        x[n] --> Nup --> v[m] --> LPF --> w[m] --> Ndown --> y[k]
    
        The w[m] is the result of upsampling x[n] by Nup. The LPF has narrowed the
        pass band to suit both Nup and Ndown. Therefore the downsampling by Ndown
        implies that only one in every Ndown samples of w[m] needs to be calculated
        to get y[k] = w[k * D]. Hence resampling by Nup / Ndown takes less
        processing than upsampling when Ndown > 1.
    
        Poly phases in up and downsampling:
        The upsampling structure outputs the interpolated samples via the separate
        output phases. The same input sequence x is used to calculate all Nup
        phases, whereby each phase uses a different set of coefficients from the
        LPF. The downsampling structure sums the Ndown phases to output the
        downsampled samples. Each phase uses a different set of coefficients from
        the LPF to filter Ndown delay phases of the input sequence x.
    
        Resampling is upsampling with downsampling by phase selection:
        The resampling is done by first upsampling and then downsampling, because
        then only one shared LPF is needed. For upsampling an LPF is always
        needed, because it has to construct the inserted Nup - 1 zero values. For
        downsampling the LPF of the upsampling already has restricted the input
        band width (BW), provided that the LPF has BW < fNyquist / U and BW <
        fNyquist / D. The downsampling therefore does not need an LPF and reduces
        to selecting appropriate output phase of the upsampler. The upsampler then
        only needs to calculate the output phase that will be used by the
        downsampler, instead of calculating all output phases.
    
        Input:
        . x: Input signal x[n]
        . Nup: upsample (interpolation) factor
        . Ndown: downsample (decimation) factor
        . coefs: FIR filter coefficients for anti aliasing LPF
        . verify: when True then verify that output y is the same when calculated
                  directly or when calculated using the polyphase implementation.
        - verbosity: when > 0 print() status, else no print()
        Return:
        . y: Resampled output signal y[m]
    
        Assumptions:
        . x[n] = 0 for n < 0
        . k = m // D = (n * U) // D, so first resampled output sample starts at
          k = 0 based on n = 0
        . no y[k] for k < 0
        . insert U - 1 zeros after each x[n] to get v[m], so len(v) = len(y) =
          U * len(x)
        . use coefs as anti aliasing filter, must be LPF with BW < fNyquist / U
          and BW < fNyquist / D
        . len(coefs) typically is multiple of Nup. If shorter, then the coefs are
          extended with zeros.
    
        Remarks:
        . The group delay is the same as for the upsampler, because the
          downsampling has no additional filtering.
        """
        Nx = len(x)
        Nv = Nup * Nx
        # Nyp is number of samples from v, per polyphase in polyY
        Nyp = (Nv + Ndown - 1) // Ndown
        # Ny is used number of samples from v
        Ny = 1 + Ndown * (Nyp - 1)
    
        # Upsampling with LPF
        w = upsample(x, Nup, coefs, verify=False)
        # Downsampling selector
        # This model is not efficient, because the upsample() has calculated all
        # Nup phases, whereas it only needs to calculate the phase that is selected
        # by the downsampling. However, it is considered not worth the effort to
        # make the model more efficient, because then only parts of lfilter() in
        # upsample() need to be calulated, so then lfilter() needs to be
        # implemented manually and possibly in a for loop [CROCHIERE 3.3.4].
        downSelect = np.arange(0, Nyp) * Ndown
        y = w[downSelect]
    
        if verify:
            # Inefficient implementation
            # . Upsampling with multiply by zero values
            v = np.zeros((Nup, Nx))   # polyphases
            v[0] = x
            v = v.T.reshape(1, Nup * Nx)[0]  # upsample
            # . LPF
            w = Nup * signal.lfilter(coefs, c_firA, v)
            # . Downsampling with calculating values that are removed
            yVerify = np.zeros(Ndown * Nyp)
            yVerify[0 : Ny] = w[0 : Ny]
            yVerify = yVerify.reshape(Nyp, Ndown).T   # polyphases
            yVerify = yVerify[0]   # downsample
    
            print('> Verify resample():')
            if np.allclose(y, yVerify, rtol=c_rtol, atol=c_atol):
                print('  . PASSED: correct resample result')
            else:
                exit('ERROR: wrong resample result')
        if verbosity:
            print('> Log resample():')
            print('  . len(x) =', str(len(x)))
            print('  . Nx     =', str(Nx))
            print('  . len(v) =', str(len(v)))
            print('  . Ny     =', str(Ny))
            print('  . Nyp    =', str(Nyp))
            print('  . len(y) =', str(len(y)))
            print('')
        return y