Newer
Older
#! /usr/bin/env python3
###############################################################################
#
# Copyright 2024
# ASTRON (Netherlands Institute for Radio Astronomy) <http://www.astron.nl/>
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###############################################################################
# Author: Eric Kooistra
# Purpose: Multirate functions for DSP
# Description:
#
# Usage:
# . Use [2] to verify this code.
#
# References:
# [1] dsp_study_erko.txt
# [2] http://localhost:8888/notebooks/pfb_os/up_downsampling.ipynb
#
# Books:
# . HARRIS 6 title Figure
# . LYONS
# . CROCHIERE
import numpy as np
from scipy import signal
from .utilities import c_rtol, c_atol, ceil_div
def down(x, D, phase=0):
"""Downsample x[n] by factor D, xD[m] = x[m D], m = n // D
Keep every D-th sample in x and skip every D-1 samples after that sample.
"""
xD = x[phase::D]
return xD
def up(x, U):
"""Upsample x by factor U, xU[m] = x[n] when m = n U, else 0.
After every sample in x insert U-1 zeros.
"""
xU = np.zeros(len(x) * U)
xU[0::U] = x
return xU
###############################################################################
# Polyphase filter
###############################################################################
class PolyPhaseFirFilterStructure:
"""Polyphase FIR filter structure (PFS) per block of data
Input:
. Nphases: number of poly phases, is data block size, is number of rows
(axis=0) in polyphase structure
. coefs: FIR coefficients b[0 : Nphases * Ntaps - 1]
. commutator: 'up' or 'down', commutator 'up' selects the polyCoefs from
phase 0 : Nphases - 1, commutator 'down' first flips the phases order
in polyCoefs.
Derived:
. Ntaps: number of taps per poly phase, is number of columns (axis=1) in
the polyphase structure
Map LPF FIR coefs to polyCoefs with:
. flip rows (axis=0) to have first coefs[0] at last row to fit time order
of data block with newest sample at last index. Flipping polyCoefs once
here at init is probably more efficient, then flipping inData and outData
for every block.
Filter delay memory polyDelays:
. Shift in data blocks per column at column 0, last sample in data block
and in column is newest.
"""
def __init__(self, Nphases, coefs, commutator):
self.coefs = coefs
self.Ncoefs = len(coefs)
self.Nphases = Nphases # branches, rows
self.Ntaps = ceil_div(self.Ncoefs, Nphases) # taps, columns
# Apply polyCoefs branches in range(Nphases) order, because coefs
# mapping in poly_coeffs(commutator) already accounts for down or up.
self.polyCoefs = self.poly_coeffs(commutator)
# oldest sample[0]
# newest sample [15]
# samples [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
# blocks [ 0, 1, 2, 3]
# shift blocks to next column in polyDelays
# put newest block[3] in first column of polyDelays
# polyDelays = [[ 8, 4, 0],
# [ 9, 5, 1],
# [10, 6, 2],
# [11, 7, 3]])
self.polyDelays = np.zeros((Nphases, self.Ntaps))
def poly_coeffs(self, commutator):
"""Polyphase structure for FIR filter coefficients coefs in Nphases
Input:
. coefs = prototype FIR filter coefficients (impulse response)
. commutator: 'up' or 'down'
Return:
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
. polyCoefs:
. E.g. coefs = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
. If Ncoefs < Ntaps * Nphases, then pad polyCoefs with zeros.
. Branch order of polyCoefs accounts for commutator, therefore the
polyCoefs branches can be applied in range(Nphases) order
independent of commutator direction.
. commutator: 'up', first output appears when the switch is at the
branch p = 0 and then yields a new output for every branch,
therefore p = 0 first in polyCoefs.
polyCoefs = [[ 0, 4, 8], # p = 0, H0(z)
[ 1, 5, 9], # p = 1, H1(z)
[ 2, 6, 10], # p = 2, H2(z)
[ 3, 7, 11]]) # p = 3, H3(z),
Nphases = 4 rows (axis=0)
Ntaps = 3 columns (axis=1)
. commutator: 'down', new FIR sum output starts being aggregated when
switch is at branch p = Nphases-1, therefore p = Nphases - 1 first
in polyCoefs
polyCoefs = [[ 3, 7, 11], # p = 3, H3(z)
[ 2, 6, 10], # p = 2, H2(z)
[ 1, 5, 9], # p = 1, H1(z)
[ 0, 4, 8]]) # p = 0, H0(z)
Nphases = 4 rows (axis=0)
Ntaps = 3 columns (axis=1)
"""
polyCoefs = np.zeros(self.Nphases * self.Ntaps)
polyCoefs[0 : self.Ncoefs] = self.coefs
polyCoefs = polyCoefs.reshape(self.Ntaps, self.Nphases).T
if commutator == 'down':
polyCoefs = np.flip(polyCoefs, axis=0)
return polyCoefs
def filter_block(self, inData):
"""Filter block of inData per poly phase
Input:
. inData: block of Nphases input data, time index n increments, so
inData[0] is oldest data and inData[Nphases-1] is newest data.
Return:
. outData: block of Nphases filtered output data with outData[0] is
oldest data and outData[Nphases-1] is newest data, so time index n
increments, like with inData.
"""
# Shift delay memory by one block
self.polyDelays = np.roll(self.polyDelays, 1, axis=1)
# Shift in inData block at column 0
self.polyDelays[:, 0] = inData
# Apply FIR coefs per element
zData = self.polyDelays * self.polyCoefs
outData = np.sum(zData, axis=1)
return outData
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
def poly_structure_size_for_downsampling_whole_x(Lx, Ndown):
"""Polyphase structure size for downsampling a signal x with Lx samples
The polyphase structure has Ndown branches and branch size suitable to use lfilter() once per polyphase branch
for whole signal x. Instead of using pfs.polyDelays per pfs.Ntaps.
. Prepend x with Ndown - 1 zeros to have first down sampled sample at m = 0 start at n = 0.
. Skip any remaining last samples from x, that are not enough yield a new output FIR sum.
Input:
. Lx: len(x)
. Ndown: downsample rate
Return:
. Nx: Total number of samples from x, including prepended Ndown - 1 zeros.
. Nxp: Total number of samples used from x per polyphase branch, is Ny the number of samples that will be in
downsampled output y[m] = x[m D] for m = 0, 1, 2, ..., Nxp - 1
"""
# Numer of samples per polyphase
Nxp = (Ndown - 1 + Lx) // Ndown
# Used number of samples from x
Nx = 1 + Ndown * (Nxp - 1)
return Nx, Nxp
def poly_structure_data_for_downsampling_whole_x(x, Ndown):
"""Polyphase data structure for whole signal x
. see poly_structure_size_for_downsampling_whole_x()
Input:
. x: input samples
Return:
. polyX: polyphase data structure with size (Ndown, Nxp) for Ndown branches and Nxp samples from x per branch.
"""
Lx = len(x)
Nx, Nxp = poly_structure_size_for_downsampling_whole_x(Lx, Ndown)
polyX = np.zeros(Ndown * Nxp)
polyX[Ndown - 1] = x[0] # prepend x with Ndown - 1 zeros
polyX[Ndown:] = x[1 : Nx]
polyX = polyX.reshape(Nxp, Ndown).T
return polyX, Nx, Nxp
def upsample(x, Nup, coefs, verify=False, verbosity=1): # interpolate
"""Upsample x by factor U = Nup and LPF
Input:
. x: Input signal x[n]
. Nup: upsample (interpolation) factor
. coefs: prototype FIR filter coefficients for anti aliasing LPF
. verify: when True then verify that output y is the same when calculated directly or when calculated using the
polyphase implementation.
Return:
. y: Upsampled output signal y[m]
Assumptions:
. x[n] = 0 for n < 0
. m = n * U, so first upsampled output sample starts at m = 0 based on n = 0
. no y[m] for m < 0
. insert U - 1 zeros after each x[n] to get v[m], so len(v) = len(y) = U * len(x)
. use coefs as anti aliasing filter, must be LPF with BW < fNyquist / U
. len(coefs) typically is multiple of Nup. If shorter, then the coefs are extended with zeros.
Remarks:
. The input sample period is ts and the output sample period of the upsampled (= interpolated signal) is tsUp =
ts / U
. The group delay is (Ncoefs - 1) / 2 * tsUp = (Ncoefs - 1) / U / 2 * ts. With odd Ncoefs and symmetrical coefs
to have linear phase, the group delay is an integer number of tsUp periods, so:
- when (Ncoefs - 1) % 2 == 0, then the group delay is an integer number of tsUp periods
- when (Ncoefs - 1) % (2 * U) == 0, then the group delay is an integer number of ts periods
Procedure:
x[n]: 0 1 2 3 --> time n with unit Ts
v[m] = x[m / U], for m = 0, +-U, +-2U, ...
= 0, else
v[m]: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 --> time m with unit Ts / U
| | | |
h[k]: 11 10 9 8 7 6 5 4 3 2 1 0 | | | --> coefs for m = 12
11 10 9 8 7 6 5 4 3 2 1 0 | | --> coefs for m = 13
11 10 9 8 7 6 5 4 3 2 1 0 | --> coefs for m = 14
11 10 9 8 7 6 5 4 3 2 1 0 --> coefs for m = 15
y[m]: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 --> time m with unit Ts / U
Calculate y[0, 1, 2, 3] at x[0]
y[0] = h[0] x[0]
y[1] = h[1] x[0]
y[2] = h[2] x[0]
y[3] = h[3] x[0]
Calculate y[4, 5 ,6 ,7] at x[1]
y[ 4] = h[0] x[1] + h[4] x[0]
y[ 5] = h[1] x[1] + h[5] x[0]
y[ 6] = h[2] x[1] + h[6] x[0]
y[ 7] = h[3] x[1] + h[7] x[0]
Calculate y[8, 9, 10, 11] at x[2]
y[ 8] = h[0] x[2] + h[4] x[1] + h[ 8] x[0]
y[ 9] = h[1] x[2] + h[5] x[1] + h[ 9] x[0]
y[10] = h[2] x[2] + h[6] x[1] + h[10] x[0]
y[11] = h[3] x[2] + h[7] x[1] + h[11] x[0]
Calculate y[12, 13, 14, 15] at x[3], see '|' markers between v[m] (is zero padded x[n]) and h[k] above
y[12] = h[0] x[3] + h[4] x[2] + h[ 8] x[1]
y[13] = h[1] x[3] + h[5] x[2] + h[ 9] x[1]
y[14] = h[2] x[3] + h[6] x[2] + h[10] x[1]
y[15] = h[3] x[3] + h[7] x[2] + h[11] x[1]
==> Calculate y[n * U + [0, ..., U - 1]] at x[n]
y[n * U + 0] = lfilter(h[0, 4, 8], [1], x) # p = 0
y[n * U + 1] = lfilter(h[1, 5, 9], [1], x) # p = 1
y[n * U + 2] = lfilter(h[2, 6,10], [1], x) # p = 2
y[n * U + 3] = lfilter(h[3, 7,11], [1], x) # p = 3
"""
Nx = len(x)
# Polyphase implementation to avoid multiply by zero values
# coefs = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
# polyCoefs = [[ 0, 4, 8], # p = 0
# [ 1, 5, 9], # p = 1
# [ 2, 6, 10], # p = 2
# [ 3, 7, 11]]) # p = 3
pfs = PolyPhaseFirFilterStructure(Nup, coefs, commutator='up')
polyCoefs = pfs.polyCoefs
# Poly phases for data
# . Use polyX for whole data signal x with Nx samples, instead of pfs.polyDelays per pfs.Ntaps, to be able to use
# signal.lfilter() per branch for whole data signal x.
polyY = np.zeros((Nup, Nx))
# Filter x per polyphase, index p = 0, 1, .., Nup - 1, order in polyCoefs already accounts for the commutator
# direction [LYONS Fig 10.13, 10.23, 10.23 seems to have commutator arrows in wrong direction], [CROCHIERE Fig
# 3.25], [HARRIS 7.11, 7.12]
for p in range(Nup):
polyY[p] = signal.lfilter(polyCoefs[p], a, x)
# Output Nup samples for every input sample
y = polyY.T.reshape(1, Nup * Nx)[0]
if verify:
# Inefficient upsampling implementation with multiply by zero values
xZeros = np.zeros((Nup, Nx))
xZeros[0] = x
xZeros = xZeros.T.reshape(1, Nup * Nx)[0] # upsample
yVerify = signal.lfilter(coefs, a, xZeros) # LPF
if np.allclose(y, yVerify, rtol=c_rtol, atol=c_atol):
print('PASSED: correct upsample result')
else:
print('ERROR: wrong upsample result')
print('. Nup =', str(Nup))
print('. Nx =', str(Nx))
print('. len(y) =', str(len(y)))
return y
def downsample(x, Ndown, coefs, verify=False, verbosity=1): # decimate
"""LPF and downsample x by factor D = Ndown
Input:
. x: Input signal x[n]
. Ndown: downsample (decimation) factor
. coefs: prototype FIR filter coefficients for anti aliasing LPF
. verify: when True then verify that output y is the same when calculated directly or when calculated using the
polyphase implementation.
Return:
. y: Downsampled output signal y[m]
Assumptions:
. x[n] = 0 for n < 0
. m = n // D, so first downsampled output sample starts at m = 0 based on n = 0
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
. no y[m] for m < 0
. skip D - 1 values zeros after each x[n] to get y[m], so len(y) = len(x) // D
. use coefs as anti aliasing filter, must be LPF with BW < fNyquist / D
. len(coefs) typically is multiple of Ndown. If shorter, then the coefs are extended with zeros.
Remarks:
. The input sample period is ts and the output sample period of the downsampled (= decimated signal) is tsDown =
ts * D
. The group delay is (Ncoefs - 1) / 2 * ts. With odd Ncoefs and symmetrical coefs to have linear phase, the group
delay is an integer number of ts periods, so:
- when (Ncoefs - 1) % 2 == 0, then the group delay is an integer number of ts periods
- when (Ncoefs - 1) % (2 * D) == 0, then the group delay is an integer number of tsDown periods
Procedure:
x[n]: 0 1 2 3 4 5 6 7 8 9 10 11 12 --> time n with unit Ts
|
h[k]: 11 10 9 8 7 6 5 4 3 2 1 0 --> coefs for y[m] at m = 3
8 4 0 --> coefs for v[n] at n = 12
9 5 1 --> coefs for v[n] at n = 11
10 6 2 --> coefs for v[n] at n = 10
11 7 3 --> coefs for v[n] at n = 9
v[n]: 0 1 2 3 4 5 6 7 8 9 10 11 12
y[m]: 0 1 2 3 --> time m with unit Ts * D
Remove v[-3] = h[3] x[-3] at x[-3]
Remove v[-2] = h[2] x[-2] at x[-2]
Remove v[-1] = h[1] x[-1] at x[-1]
Calculate v[ 0] = h[0] x[ 0] at x[ 0]
Calculate v[ 1] = h[3] x[ 1] at x[ 1]
Calculate v[ 2] = h[2] x[ 2] at x[ 2]
Calculate v[ 3] = h[1] x[ 3] at x[ 3]
Calculate v[ 4] = h[0] x[ 4] + h[4] x[0] at x[ 4]
Calculate v[ 5] = h[3] x[ 5] + h[7] x[1] at x[ 5]
Calculate v[ 6] = h[2] x[ 6] + h[6] x[2] at x[ 6]
Calculate v[ 7] = h[1] x[ 7] + h[5] x[3] at x[ 7]
Calculate v[ 8] = h[0] x[ 8] + h[4] x[4] + h[ 8] x[0] at x[ 8]
Calculate v[ 9] = h[3] x[ 9] + h[7] x[5] + h[11] x[1] at x[ 9]
Calculate v[10] = h[2] x[10] + h[6] x[6] + h[10] x[2] at x[10]
Calculate v[11] = h[1] x[11] + h[5] x[7] + h[ 9] x[3] at x[11]
Calculate v[12] = h[0] x[12] + h[4] x[8] + h[ 8] x[4] at x[12]
==> Calculate y[m] at sum v[n - p] at x[n] for n = m * D and p = 3, 2, 1, 0
Ndown-1
y[m] = sum v[m * D - p]
p = 0
with:
[n - p] [p]
v[n - 3] = lfilter(h[3, 7,11], [1], polyX[0])
v[n - 2] = lfilter(h[2, 6,10], [1], polyX[1])
v[n - 1] = lfilter(h[1, 5, 9], [1], polyX[2])
v[n - 0] = lfilter(h[0, 4, 8], [1], polyX[3])
"""
# Polyphase implementation to avoid calculating values that are removed
# coefs = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
# polyCoefs = [[ 3, 7, 11],
# [ 2, 6, 10],
# [ 1, 5, 9],
# [ 0, 4, 8]])
pfs = PolyPhaseFirFilterStructure(Ndown, coefs, commutator='down')
polyCoefs = pfs.polyCoefs
# Poly phases for whole data signal x, prepended with Ndown - 1 zeros
polyX, Nx, Nxp = poly_structure_data_for_downsampling_whole_x(x, Ndown) # size (Ndown, Nxp), Ny = Nxp
# Filter x per polyphase, index p = 0, 1, .., Ndown - 1, the row order in polyCoefs already accounts for the
# commutator direction. [HARRIS Fig 6.12, 6.13], [LYONS Fig 10.14, 10.22], [CROCHIERE Fig 3.29]
polyY = np.zeros((Ndown, Nxp))
for p in range(Ndown):
polyY[p] = signal.lfilter(polyCoefs[p], a, polyX[p])
# Sum the branch outputs to get single downsampled output value
y = np.sum(polyY, axis=0)
if verify:
# Inefficient downsampling implementation with calculating values that are removed
yVerify = np.zeros(Ndown * Nxp)
yVerify[0 : Nx] = signal.lfilter(coefs, a, x[0 : Nx]) # LPF
yVerify = yVerify.reshape(Nxp, Ndown).T # poly phases
yVerify = yVerify[0] # downsample
if np.allclose(y, yVerify, rtol=c_rtol, atol=c_atol):
print('PASSED: correct downsample result')
else:
print('ERROR: wrong downsample result')
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
print('. len(x) =', str(len(x)))
print('. Ndown =', str(Ndown))
print('. Nx =', str(Nx))
print('. Nxp =', str(Nxp))
print('. len(y) =', str(len(y))) # = Nxp
print('')
return y
def downsample_bpf(x, Ndown, k, Ndft, coefs, verbosity=1):
"""BPF at bin k of Ndft and downsample x by factor D = Ndown [HARRIS Fig 6.14]
Implement downsampling down converter for one bin [HARRIS Fig 6.14].
. see downsample()
Input:
. x: Input signal x[n]
. Ndown: downsample (decimation) factor
. k: Index of BPF center frequency w_k = 2 pi k / Ndft
. Ndft: number of frequency bins, DFT size
. coefs: prototype FIR filter coefficients for anti aliasing BPF
- verbosity: when > 0 print() status, else no print()
Return:
. y: Downsampled and down converted output signal y[m], m = n // D for bin
k. Complex baseband signal.
"""
a = [1.0] # FIR b = coefs, a = 1
# Polyphase implementation to avoid calculating values that are removed
# coefs = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
# polyCoefs = [[ 3, 7, 11],
# [ 2, 6, 10],
# [ 1, 5, 9],
# [ 0, 4, 8]])
pfs = PolyPhaseFirFilterStructure(Ndown, coefs, commutator='down')
polyCoefs = pfs.polyCoefs
# Poly phases for whole data signal x, prepended with Ndown - 1 zeros
polyX, Nx, Nxp = poly_structure_data_for_downsampling_whole_x(x, Ndown) # size (Ndown, Nxp), Ny = Nxp
# Filter x per polyphase, order in polyCoefs accounts for commutator [HARRIS Fig 6.12, 6.13]
polyY = np.zeros((Ndown, Nxp))
for p in range(Ndown):
polyY[p] = signal.lfilter(polyCoefs[p], a, polyX[p])
# Phase rotate per polyphase, due to delay line at branch inputs [HARRIS Eq 6.8]
polyYC = np.zeros((Ndown, Nxp), dtype='cfloat')
for p in range(Ndown):
polyYC[p] = polyY[p] * np.exp(1j * 2 * np.pi * (Ndown - 1 - p) * k / Ndown)
# Sum the branch outputs to get single downsampled and downconverted output value
y = np.sum(polyYC, axis=0)
if verbosity:
print('> downsample_bpf():')
print('. len(x) =', str(len(x)))
print('. Ndown =', str(Ndown))
print('. Nx =', str(Nx))
print('. Nxp =', str(Nxp))
print('. len(y) =', str(len(y))) # = Nxp
print('. Ndft =', str(Ndft))
print('. k =', str(k))
return y
def resample(x, Nup, Ndown, coefs, verify=False, verbosity=1): # interpolate and decimate by Nup / Ndown
"""Resample x by factor U / D = Nup / Ndown
x[n] --> Nup --> v[m] --> LPF --> w[m] --> Ndown --> y[k]
The w[m] is the result of upsampling x[n] by Nup. The LPF has narrowed the pass band to suit both Nup and Ndown.
Therefore the downsampling by Ndown implies that only one in every Ndown samples of w[m] needs to be calculated
to get y[k] = w[k * D]. Hence resampling by Nup / Ndown takes less processing than upsampling when Ndown > 1.
The upsampling structure outputs the interpolated samples via the separate output phases. The same input sequence
x is used to calculate all Nup phases, whereby each phase uses a different set of coefficients from the LPF.
The downsampling structure sums the Ndown phases to output the decimated samples. Each phase uses a different set
of coefficients from the LPF to filter Ndown delay phases of the input sequence x.
Resampling is upsampling with downsampling by phase selection
The resampling is done by first upsampling and then downsampling, because then only one shareed LPF is needed.
For upsampling an LPF is always needed, because it has to construct the inserted Nup - 1 zero values. For
downsampling the LPF of the upsampling already has restricted the input band width (BW), provided that the LPF
has BW < fNyquist / U and BW < fNyquist / D. The downsampling therefore does not need an LPF and reduces to
selecting appropriate output phase of the upsampler. The upsampler then only needs to calculate the output phase
that will be used by the downsampler, instead of calculating all output phases.
Input:
. x: Input signal x[n]
. Nup: upsample (interpolation) factor
. Ndown: downsample (decimation) factor
. coefs: FIR filter coefficients for anti aliasing LPF
. verify: when True then verify that output y is the same when calculated directly or when calculated using the
polyphase implementation.
Return:
. y: Resampled output signal y[m]
Assumptions:
. x[n] = 0 for n < 0
. k = m // D = (n * U) // D, so first resampled output sample starts at k = 0 based on n = 0
. no y[k] for k < 0
. insert U - 1 zeros after each x[n] to get v[m], so len(v) = len(y) = U * len(x)
. use coefs as anti aliasing filter, must be LPF with BW < fNyquist / U and BW < fNyquist / D
. len(coefs) typically is multiple of Nup. If shorter, then the coefs are extended with zeros.
Remarks:
. The group delay is the same as for the upsampler, because the downsampling has no additional filtering.
"""
Nx = len(x)
Nv = Nup * Nx
Nyp = (Nv + Ndown - 1) // Ndown # Number of samples from v, per poly phase in polyY
Ny = 1 + Ndown * (Nyp - 1) # Used number of samples from v
# Upsampling with LPF
w = upsample(x, Nup, coefs, verify=False)
# Downsampling selector
# This model is not efficient, because the upsample() has calculated all Nup phases, whereas it only needs to
# calculate the phase that is selected by the downsampling. However, it is considered not worth the effort to
# make the model more efficient, because then only parts of lfilter() in upsample() need to be calulated, so
# then lfilter() needs to be implemented manually and possibly in a for loop [section 3.3.4 CROCHIERE].
downSelect = np.arange(0, Nyp) * Ndown
y = w[downSelect]
if verify:
# Inefficient implementation
# . Upsampling with multiply by zero values
v = np.zeros((Nup, Nx)) # poly phases
v[0] = x
v = v.T.reshape(1, Nup * Nx)[0] # upsample
# . LPF
w = signal.lfilter(coefs, a, v)
# . Downsampling with calculating values that are removed
yVerify = np.zeros(Ndown * Nyp)
yVerify[0 : Ny] = w[0 : Ny]
yVerify = yVerify.reshape(Nyp, Ndown).T # poly phases
yVerify = yVerify[0] # downsample
if np.allclose(y, yVerify, rtol=c_rtol, atol=c_atol):
print('PASSED: correct resample result')
else:
print('ERROR: wrong resample result')
print('. len(x) =', str(len(x)))
print('. Nx =', str(Nx))
print('. len(v) =', str(len(v)))
print('. Ny =', str(Ny))
print('. Nyp =', str(Nyp))
print('. len(y) =', str(len(y)))
return y