Skip to content
Snippets Groups Projects
Commit 3b63b541 authored by Eric Kooistra's avatar Eric Kooistra
Browse files

Add dsp.downsample().

parent 72484cbc
No related branches found
No related tags found
1 merge request!404Resolve RTSD-224
Pipeline #79422 passed
......@@ -544,13 +544,14 @@ def upsample(x, Nup, coefs, verify=False): # interpolate
. x: Input signal x[n]
. Nup: upsample (interpolation) factor
. coefs: FIR filter coefficients for antialiasing LPF
. verify: when True then verify that output y is the same when calculated directly or when calculatged using the
. verify: when True then verify that output y is the same when calculated directly or when calculated using the
polyphase implementation.
Return:
. y: Upsampled output signal y[m]
Assumptions:
. x[n] = 0 for n < 0
. m = n * I, so first upsampled output sample starts at m = 0 based on n = 0
. no y[m] for m < 0
. insert I - 1 zeros after each x[n] to get v[m], so len(v) = len(y) = I * len(x)
. use coefs as anti aliasing filter, must be LPF with BW < fNyquist / I
......@@ -559,8 +560,10 @@ def upsample(x, Nup, coefs, verify=False): # interpolate
Remarks:
. The input sample period is ts and the output sample period of the upsampled (= interpolated signal) is tsUp =
ts / I
. The group delay is (Ncoefs - 1) / 2 * tsUp. With odd Ncoefs and symmetrical coefs to have linear phase, the
group delay is an integer number of tsUp periods.
. The group delay is (Ncoefs - 1) / 2 * tsUp = (Ncoefs - 1) / I / 2 * ts. With odd Ncoefs and symmetrical coefs
to have linear phase, the group delay is an integer number of tsUp periods, so:
- when (Ncoefs - 1) % 2 == 0, then the group delay is an integer number of tsUp periods
- when (Ncoefs - 1) % (2 * I) == 0, then the group delay is an integer number of ts periods
Procedure:
......@@ -577,46 +580,57 @@ def upsample(x, Nup, coefs, verify=False): # interpolate
y[m]: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 --> time m with unit Ts / I
Calucate y[0, 1, 2, 3] at x[0]
Calculate y[0, 1, 2, 3] at x[0]
y[0] = h[0] x[0]
y[1] = h[1] x[0]
y[2] = h[2] x[0]
y[3] = h[3] x[0]
Calucate y[4, 5 ,6 ,7] at x[1]
Calculate y[4, 5 ,6 ,7] at x[1]
y[ 4] = h[0] x[1] + h[4] x[0]
y[ 5] = h[1] x[1] + h[5] x[0]
y[ 6] = h[2] x[1] + h[6] x[0]
y[ 7] = h[3] x[1] + h[7] x[0]
Calucate y[8, 9, 10, 11] at x[2]
Calculate y[8, 9, 10, 11] at x[2]
y[ 8] = h[0] x[2] + h[4] x[1] + h[ 8] x[0]
y[ 9] = h[1] x[2] + h[5] x[1] + h[ 9] x[0]
y[10] = h[2] x[2] + h[6] x[1] + h[10] x[0]
y[11] = h[3] x[2] + h[7] x[1] + h[11] x[0]
Calucate y[12, 13, 14, 15] at x[3], see '|' markers between v[m] (is zero padded x[n]) and h[k] above
Calculate y[12, 13, 14, 15] at x[3], see '|' markers between v[m] (is zero padded x[n]) and h[k] above
y[12] = h[0] x[3] + h[4] x[2] + h[ 8] x[1]
y[13] = h[1] x[3] + h[5] x[2] + h[ 9] x[1]
y[14] = h[2] x[3] + h[6] x[2] + h[10] x[1]
y[15] = h[3] x[3] + h[7] x[2] + h[11] x[1]
==> Calucate y[n * I + [0, ..., I - 1]] at x[n]
==> Calculate y[n * I + [0, ..., I - 1]] at x[n]
y[n * I + 0] = lfilter(h[0, 4, 8], [1], x)
y[n * I + 1] = lfilter(h[1, 5, 9], [1], x)
y[n * I + 2] = lfilter(h[2, 6,10], [1], x)
y[n * I + 3] = lfilter(h[3, 7,11], [1], x)
y[n * I + 0] = lfilter(h[0, 4, 8], [1], x) # p = 0
y[n * I + 1] = lfilter(h[1, 5, 9], [1], x) # p = 1
y[n * I + 2] = lfilter(h[2, 6,10], [1], x) # p = 2
y[n * I + 3] = lfilter(h[3, 7,11], [1], x) # p = 3
"""
Nx = len(x)
Ncoefs = len(coefs)
a = [1.0]
# Polyphase implementation to avoid multiply by zero values
Ntaps = Ncoefs // Nup
polyCoefs = coefs.reshape(Ntaps, Nup).T
# coefs = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
# polyCoefs = [[ 0, 4, 8], # p = 0
# [ 1, 5, 9], # p = 1
# [ 2, 6, 10], # p = 2
# [ 3, 7, 11]]) # p = 3
# If necessary pad coefs with zeros
Ntaps = ceil_div(Ncoefs, Nup)
polyCoefs = np.zeros(Nup * Ntaps)
polyCoefs[0:Ncoefs] = coefs
polyCoefs = polyCoefs.reshape(Ntaps, Nup).T
# Poly phases for data
polyY = np.zeros((Nup, Nx))
# Filter x per polyphase
# Filter x per polyphase, index p = 0, 1, .., Nup - 1 is the commutator in [Figure 10.13 in LYONS]
for p in range(Nup):
polyY[p] = signal.lfilter(polyCoefs[p], a, x)
y = polyY.T.reshape(1, Nup * Nx)[0]
......@@ -634,6 +648,125 @@ def upsample(x, Nup, coefs, verify=False): # interpolate
return y
def downsample(x, Ndown, coefs, verify=False): # decimate
"""Downsample x by factor D = Ndown up
Input:
. x: Input signal x[n]
. Ndown: downsample (decimation) factor
. coefs: FIR filter coefficients for antialiasing LPF
. verify: when True then verify that output y is the same when calculated directly or when calculated using the
polyphase implementation.
Return:
. y: Downsampled output signal y[m]
Assumptions:
. x[n] = 0 for n < 0
. m = n // I, so first downsampled output sample starts at m = 0 based on n = 0
. no y[m] for m < 0
. skip D - 1 values zeros after each x[n] to get y[m], so len(y) = len(x) // D
. use coefs as anti aliasing filter, must be LPF with BW < fNyquist / D
. len(coefs) typically is multiple of Ndown. If shorter, then the coefs are extended with zeros.
Remarks:
. The input sample period is ts and the output sample period of the downsampled (= decimated signal) is tsDown =
ts * D
. The group delay is (Ncoefs - 1) / 2 * ts. With odd Ncoefs and symmetrical coefs to have linear phase, the group
delay is an integer number of ts periods, so:
- when (Ncoefs - 1) % 2 == 0, then the group delay is an integer number of ts periods
- when (Ncoefs - 1) % (2 * D) == 0, then the group delay is an integer number of tsDown periods
Procedure:
x[n]: 0 1 2 3 4 5 6 7 8 9 10 11 12 --> time n with unit Ts
|
h[k]: 11 10 9 8 7 6 5 4 3 2 1 0 --> coefs for y[m] at m = 3
8 4 0 --> coefs for v[n] at n = 12
9 5 1 --> coefs for v[n] at n = 11
10 6 2 --> coefs for v[n] at n = 10
11 7 3 --> coefs for v[n] at n = 9
v[n]: 0 1 2 3 4 5 6 7 8 9 10 11 12
y[m]: 0 1 2 3 --> time m with unit Ts * D
Remove v[-3] = h[3] x[-3] at x[ 0]
Remove v[-2] = h[2] x[-2] at x[ 0]
Remove v[-1] = h[1] x[-1] at x[ 0]
Calculate v[ 0] = h[0] x[ 0] at x[ 0]
Calculate v[ 1] = h[3] x[ 1] at x[ 1]
Calculate v[ 2] = h[2] x[ 2] at x[ 2]
Calculate v[ 3] = h[1] x[ 3] at x[ 3]
Calculate v[ 4] = h[0] x[ 4] + h[4] x[0] at x[ 4]
Calculate v[ 5] = h[3] x[ 5] + h[7] x[1] at x[ 5]
Calculate v[ 6] = h[2] x[ 6] + h[6] x[2] at x[ 6]
Calculate v[ 7] = h[1] x[ 7] + h[5] x[3] at x[ 7]
Calculate v[ 8] = h[0] x[ 8] + h[4] x[4] + h[ 8] x[0] at x[ 8]
Calculate v[ 9] = h[3] x[ 9] + h[7] x[5] + h[11] x[1] at x[ 9]
Calculate v[10] = h[2] x[10] + h[6] x[6] + h[10] x[2] at x[10]
Calculate v[11] = h[1] x[11] + h[5] x[7] + h[ 9] x[3] at x[11]
Calculate v[12] = h[0] x[12] + h[4] x[8] + h[ 8] x[4] at x[12]
==> Calculate y[m] at sum v[n - p] at x[n] for n = m * D and p = 3, 2, 1, 0
Ndown-1
y[m] = sum v[m * D - p]
p = 0
with:
[n - p] [p]
v[n - 3] = lfilter(h[3, 7,11], [1], polyX[0])
v[n - 2] = lfilter(h[2, 6,10], [1], polyX[1])
v[n - 1] = lfilter(h[1, 5, 9], [1], polyX[2])
v[n - 0] = lfilter(h[0, 4, 8], [1], polyX[3])
"""
Nxp = 1 + (len(x) - 1) // Ndown # Number of samples from x per poly phase in polyX
Nx = 1 + Ndown * (Nxp - 1) # Used number of samples from x
Ncoefs = len(coefs)
a = [1.0]
# Polyphase implementation to avoid calculating values that are removed
# coefs = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
# polyCoefs = [[ 3, 7, 11],
# [ 2, 6, 10],
# [ 1, 5, 9],
# [ 0, 4, 8]])
# If necessary pad coefs with zeros
Ntaps = ceil_div(Ncoefs, Ndown)
polyCoefs = np.zeros(Ndown * Ntaps)
polyCoefs[0:Ncoefs] = coefs
polyCoefs = polyCoefs.reshape(Ntaps, Ndown).T
polyCoefs = np.flip(polyCoefs, axis=0)
# Poly phases for data
polyX = np.zeros(Ndown * Nxp)
polyX[Ndown - 1] = x[0]
polyX[Ndown:] = x[1 : Nx]
polyX = polyX.reshape(Nxp, Ndown).T
polyY = np.zeros((Ndown, Nxp))
# Filter x per polyphase, index p = 0, 1, .., Ndown - 1 is the commutator in [Figure 10.14 in LYONS]
for p in range(Ndown):
polyY[p] = signal.lfilter(polyCoefs[p], a, polyX[p])
y = np.sum(polyY, axis=0)
if verify:
# Inefficient implementation with calculating values that are removed
yVerify = np.zeros(Ndown * Nxp)
yVerify[0 : Nx] = signal.lfilter(coefs, a, x[0 : Nx]) # filter
yVerify = yVerify.reshape(Nxp, Ndown).T # poly phases
yVerify = yVerify[1] # downsample
if np.all(y == yVerify):
print('ERROR: wrong downsample result')
else:
print('PASSED: correct downsample result')
return y
###############################################################################
# Radix FFT
###############################################################################
......
Source diff could not be displayed: it is too large. Options to address this: view the blob.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment