Skip to content
Snippets Groups Projects
Commit accff15c authored by Eric Kooistra's avatar Eric Kooistra
Browse files

Support dsp.resample().

parent 1ab6b56d
No related branches found
No related tags found
1 merge request!404Resolve RTSD-224
......@@ -463,77 +463,98 @@ class PolyPhaseFirFilterStructure:
"""Polyphase FIR filter structure (PFS) per block of data
Input:
. N : number of poly phases, is data block size
. T : number of taps per poly phase
. coefs: FIR coefficients b[[0 : TN - 1]]
0 ----------------> N-1
0 0, 1, ..., N-1 -- coefs ->/ <-- time --/
| N, N+1, ..., 2N-1 /-- coefs ->/ /<- time --/
| 2N, 2N+1, ..., 3N-1 /-- coefs ->/ /<- time --/
v ..., ..., ..., ... ... ...
T-1 (T-1)N, (T-1)N+1, ..., TN-1 /-- coefs -> /<- time --
Map LPF FIR coefs to zCoefs:
. zCoefs[0][0] = coefs[0] is first coefficient
. zCoefs[T-1][N-1] = coefs[TN-1] is last coefficient
. zCoefs[T][N] indices for T taps per phase and N phases
Filter delay memory zDelays:
. zDelays[0][0] is newest data
. zDelays[T-1][N-1] is oldest data
zDelays[0] is newest block of data
zDelays[T-1] is oldest block of data
. Nphases: number of poly phases, is data block size, is number of rows
(axis=0) in polyphase structure
. coefs: FIR coefficients b[0 : Nphases * Ntaps - 1]
. commutator: 'up' or 'down', commutator 'up' selects the polyCoefs from
phase 0 : Nphases - 1, commutator 'down' first flips the phases order
in polyCoefs.
Derived:
. Ntaps: number of taps per poly phase, is number of columns (axis=1) in
the polyphase structure
Map LPF FIR coefs to polyCoefs with:
. flip rows (axis=0) to have first coefs[0] at last row to fit time order
of data block with newest sample at last index. Flipping polyCoefs once
here at init is probably more efficient, then flipping inData and outData
for every block.
. No need to flip the order of the Ntaps columns (axis=1), because the
filter sums the Ntaps.
Filter delay memory polyDelays:
. Shift in data blocks per column at column 0, last sample in data block
and in column is newest.
"""
def __init__(self, N, T, coefs):
self.N = N
self.T = T
# Reshape and flip zCoefs to fit flipped time order of inData and
# outData. Flipping zCoefs once here at init is probably more
# efficient, then flipping inData and outData for every block. No
# need to flip the order of the T rows (axis=0), because the filter
# sums the taps, so keep shifting inData in at zDelays[tap=0].
#
# inData[0], [1], ..., [N-1] --> [0] is oldest, [N-1] is newest
#
# Filter:
#
# tap ([0:T], axis=0)
# time ([[0:N], axis=1)
# 0 ------------------> N-1
# 0 N-1, ..., 1, 0 \<- coefs -- /-- time -->
# | 2N-1, ..., N+1, N \<- coefs --\ /-- time ->/
# | 3N-1, ..., 2N+1, 2N \<- coefs --\ /-- time ->/
# v ..., ..., ..., ... ... ...
# T-1 TN-1, ..., (T-1)N+1, (T-1)N <-- coefs --\ /-- time ->/
#
# *, ..., *, + --> zData = zCoefs * zDelays
# +, ..., +, + --> sum(zData, axis=1)
#
# outData[0], [1], ..., [N-1] --> [0] is oldest, [N-1] is newest
self.zCoefs = np.flip(coefs.reshape(T, N), axis=1)
self.zDelays = np.zeros((T, N))
def __init__(self, Nphases, coefs, commutator):
self.Nphases = Nphases
self.coefs = coefs
self.Ncoefs = len(coefs)
self.Ntaps = ceil_div(self.Ncoefs, Nphases)
# coefs = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
# polyCoefs = [[ 3, 7, 11],
# [ 2, 6, 10],
# [ 1, 5, 9],
# [ 0, 4, 8]]), Nphases = 4 rows (axis=0)
# Ntaps = 3 columns (axis=1)
self.polyCoefs = self.poly_coeffs()
if commutator == 'down':
self.polyCoefs = np.flip(self.polyCoefs, axis=0)
# oldest sample[0]
# newest sample [15]
# samples [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
# blocks [ 0, 1, 2, 3]
# shift blocks to next column in polyDelays
# put newest block[3] in first column of polyDelays
# polyDelays = [[ 8, 4, 0],
# [ 9, 5, 1],
# [10, 6, 2],
# [11, 7, 3]])
self.polyDelays = np.zeros((Nphases, self.Ntaps))
def poly_coeffs(self):
"""Polyphase structure for FIR filter coefficients coefs in Nphases
Input:
. coefs = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
Return:
. polyCoefs = [[ 0, 4, 8], # p = 0
[ 1, 5, 9], # p = 1
[ 2, 6, 10], # p = 2
[ 3, 7, 11]]) # p = 3, Nphases = 4 rows (axis=0)
Ntaps = 3 columns (axis=1)
If Ncoefs < Ntaps * Nphases, then pad polyCoefs with zeros.
"""
Ncoefs = self.Ncoefs
Nphases = self.Nphases
Ntaps = self.Ntaps
polyCoefs = np.zeros(Nphases * Ntaps)
polyCoefs[0:Ncoefs] = self.coefs
polyCoefs = polyCoefs.reshape(Ntaps, Nphases).T
return polyCoefs
def filter_block(self, inData):
"""Filter block of inData per poly phase
Input:
. inData: block of N input data, time n increments, so inData[0] is
oldest data and inData[N-1] is newest data
. inData: block of Nphases input data, time n increments, so inData[0] is
oldest data and inData[Nphases-1] is newest data
Return:
. outData: block of N filtered output data with outData[0] is
oldest data and outData[N-1] is newest data, so time n
. outData: block of Nphases filtered output data with outData[0] is
oldest data and outData[Nphases-1] is newest data, so time n
increments, like with inData
"""
T = self.T
Ntaps = self.Ntaps
# Shift delay memory by one block
self.zDelays[1 : T - 1] = self.zDelays[0 : T - 2]
# Shift in inData block
self.zDelays[0] = inData
self.polyDelays = np.roll(self.polyDelays, 1, axis=1)
# Shift in inData block at column 0
self.polyDelays[:, 0] = inData
# Apply FIR coefs per element
zData = self.zDelays * self.zCoefs
zData = self.polyDelays * self.polyCoefs
# Sum per poly phase
outData = np.sum(zData, axis=0)
outData = np.sum(zData, axis=1)
return outData
......@@ -621,30 +642,35 @@ def upsample(x, Nup, coefs, verify=False): # interpolate
# [ 1, 5, 9], # p = 1
# [ 2, 6, 10], # p = 2
# [ 3, 7, 11]]) # p = 3
# If necessary pad coefs with zeros
Ntaps = ceil_div(Ncoefs, Nup)
polyCoefs = np.zeros(Nup * Ntaps)
polyCoefs[0:Ncoefs] = coefs
polyCoefs = polyCoefs.reshape(Ntaps, Nup).T
pfs = PolyPhaseFirFilterStructure(Nup, coefs, commutator='up')
polyCoefs = pfs.polyCoefs
# Poly phases for data
# . Use polyY for whole data signal x with Nx samples, instead of pfs.polyDelays for pfs.Ntaps, to be able to use
# signal.lfilter()
polyY = np.zeros((Nup, Nx))
# Filter x per polyphase, index p = 0, 1, .., Nup - 1 is the commutator in [Figure 10.13 in LYONS]
# Filter x per polyphase, index p = 0, 1, .., Nup - 1 is the commutator in [Figure 10.13, 10.23 in LYONS, 3.25
# HARRIS]
for p in range(Nup):
polyY[p] = signal.lfilter(polyCoefs[p], a, x)
y = polyY.T.reshape(1, Nup * Nx)[0]
if verify:
# Inefficient implementation with multiply by zero values
# Inefficient upsampling implementation with multiply by zero values
xZeros = np.zeros((Nup, Nx))
xZeros[0] = x
xZeros = xZeros.T.reshape(1, Nup * Nx)[0]
yVerify = signal.lfilter(coefs, a, xZeros)
if np.all(y == yVerify):
print('ERROR: wrong upsample result')
else:
xZeros = xZeros.T.reshape(1, Nup * Nx)[0] # upsample
yVerify = signal.lfilter(coefs, a, xZeros) # LPF
if np.allclose(y, yVerify, rtol=c_rtol, atol=c_atol):
print('PASSED: correct upsample result')
else:
print('ERROR: wrong upsample result')
print('Upsampling:')
print('. Nup = ' + str(Nup))
print('. Nx = ' + str(Nx))
print('. len(y) = ' + str(len(y)))
return y
......@@ -691,9 +717,9 @@ def downsample(x, Ndown, coefs, verify=False): # decimate
y[m]: 0 1 2 3 --> time m with unit Ts * D
Remove v[-3] = h[3] x[-3] at x[ 0]
Remove v[-2] = h[2] x[-2] at x[ 0]
Remove v[-1] = h[1] x[-1] at x[ 0]
Remove v[-3] = h[3] x[-3] at x[-3]
Remove v[-2] = h[2] x[-2] at x[-2]
Remove v[-1] = h[1] x[-1] at x[-1]
Calculate v[ 0] = h[0] x[ 0] at x[ 0]
Calculate v[ 1] = h[3] x[ 1] at x[ 1]
......@@ -724,8 +750,11 @@ def downsample(x, Ndown, coefs, verify=False): # decimate
v[n - 1] = lfilter(h[1, 5, 9], [1], polyX[2])
v[n - 0] = lfilter(h[0, 4, 8], [1], polyX[3])
"""
Nxp = 1 + (len(x) - 1) // Ndown # Number of samples from x per poly phase in polyX
Nx = 1 + Ndown * (Nxp - 1) # Used number of samples from x
# Number of samples from x per poly phase in polyX, prepended with Ndown - 1 zeros. Prepend Ndown - 1 zeros
# to have first down sampled sample at m = 0 start at n = 0.
Nxp = (Ndown - 1 + len(x)) // Ndown
# Used number of samples from x
Nx = 1 + Ndown * (Nxp - 1)
Ncoefs = len(coefs)
a = [1.0]
......@@ -735,35 +764,132 @@ def downsample(x, Ndown, coefs, verify=False): # decimate
# [ 2, 6, 10],
# [ 1, 5, 9],
# [ 0, 4, 8]])
# If necessary pad coefs with zeros
Ntaps = ceil_div(Ncoefs, Ndown)
polyCoefs = np.zeros(Ndown * Ntaps)
polyCoefs[0:Ncoefs] = coefs
polyCoefs = polyCoefs.reshape(Ntaps, Ndown).T
polyCoefs = np.flip(polyCoefs, axis=0)
pfs = PolyPhaseFirFilterStructure(Ndown, coefs, commutator='down')
polyCoefs = pfs.polyCoefs
# Poly phases for data
# . Use polyX for whole data signal x with Nx samples, instead of pfs.polyDelays for pfs.Ntaps, to be able to use
# signal.lfilter() and to be able to prepend Ndown - 1 zeros
polyX = np.zeros(Ndown * Nxp)
polyX[Ndown - 1] = x[0]
polyX[Ndown - 1] = x[0] # prepend x with Ndown - 1 zeros
polyX[Ndown:] = x[1 : Nx]
polyX = polyX.reshape(Nxp, Ndown).T
polyY = np.zeros((Ndown, Nxp))
# Filter x per polyphase, index p = 0, 1, .., Ndown - 1 is the commutator in [Figure 10.14 in LYONS]
# Filter x per polyphase, index p = 0, 1, .., Ndown - 1 is the commutator in [Figure 10.14, 10.22 in LYONS, 3.29
# HARRIS]
for p in range(Ndown):
polyY[p] = signal.lfilter(polyCoefs[p], a, polyX[p])
y = np.sum(polyY, axis=0)
if verify:
# Inefficient implementation with calculating values that are removed
# Inefficient down sampling implementation with calculating values that are removed
yVerify = np.zeros(Ndown * Nxp)
yVerify[0 : Nx] = signal.lfilter(coefs, a, x[0 : Nx]) # filter
yVerify[0 : Nx] = signal.lfilter(coefs, a, x[0 : Nx]) # LPF
yVerify = yVerify.reshape(Nxp, Ndown).T # poly phases
yVerify = yVerify[1] # downsample
if np.all(y == yVerify):
yVerify = yVerify[0] # downsample
if np.allclose(y, yVerify, rtol=c_rtol, atol=c_atol):
print('PASSED: correct downsample result')
else:
print('ERROR: wrong downsample result')
print('Downsampling:')
print('. len(x) = ' + str(len(x)))
print('. Ndown = ' + str(Ndown))
print('. Nx = ' + str(Nx))
print('. Nxp = ' + str(Nxp))
print('. len(y) = ' + str(len(y))) # = Nxp
return y
def resample(x, Nup, Ndown, coefs, verify=False): # interpolate and decimate by Nup / Ndown
"""Resample x by factor I / D = Nup / Ndown
x[n] --> Nup --> v[m] --> LPF --> w[m] --> Ndown --> y[k]
The w[m] is the result of upsampling x[n] by Nup. The LPF has narrowed the pass band to suit both Nup and Ndown.
Therefore the downsampling by Ndown implies that only one in every Ndown samples of w[m] needs to be calculated
to get y[k] = w[k * D]. Hence resampling by Nup / Ndown takes less processing than upsampling when Ndown > 1.
Poly phases in up and down sampling:
The upsampling structure outputs the interpolated samples via the separate output phases. The same input sequence
x is used to calculate all Nup phases, whereby each phase uses a different set of coefficients from the LPF.
The downsampling structure sums the Ndown phases to output the decimated samples. Each phase uses a different set
of coefficients from the LPF to filter Ndown delay phases of the input sequence x.
Resampling is upsampling with down sampling by phase selection
The resampling is done by first upsampling and then downsampling, because then only one shareed LPF is needed.
For upsampling an LPF is always needed, because it has to construct the inserted Nup - 1 zero values. For
downsampling the LPF of the upsampling already has restricted the input band width (BW), provided that the LPF
has BW < fNyquist / I and BW < fNyquist / D. The downsampling therefore does not need an LPF and reduces to
selecting appropriate output phase of the upsampler. The upsampler then only needs to calculate the output phase
that will be used by the downsampler, instead of calculating all output phases.
Input:
. x: Input signal x[n]
. Nup: upsample (interpolation) factor
. Ndown: downsample (decimation) factor
. coefs: FIR filter coefficients for antialiasing LPF
. verify: when True then verify that output y is the same when calculated directly or when calculated using the
polyphase implementation.
Return:
. y: Resampled output signal y[m]
Assumptions:
. x[n] = 0 for n < 0
. k = m // D = (n * I) // D, so first resampled output sample starts at k = 0 based on n = 0
. no y[k] for k < 0
. insert I - 1 zeros after each x[n] to get v[m], so len(v) = len(y) = I * len(x)
. use coefs as anti aliasing filter, must be LPF with BW < fNyquist / I and BW < fNyquist / D
. len(coefs) typically is multiple of Nup. If shorter, then the coefs are extended with zeros.
Remarks:
. The group delay is the same as for the upsampler, because the downsampling has no additional filtering.
"""
Nx = len(x)
Nv = Nup * Nx
Ncoefs = len(coefs)
a = [1.0]
Nyp = (Nv + Ndown - 1) // Ndown # Number of samples from v, per poly phase in polyY
Ny = 1 + Ndown * (Nyp - 1) # Used number of samples from v
# Upsampling with LPF
w = upsample(x, Nup, coefs, verify=False)
# Downsampling selector
# This model is not efificent, because the upsample() has calculated all Nup phases, whereas it only needs to
# calculate the phase that is selected by the downsampling. However, it is considered not worth the effort to
# make the model more efficient, because then only parts of lfilter() in upsample() need to be calulated, so
# then lfilter() needs to be implemented manually and possibly in a for loop {section 3.3.4 HARRIS].
downSelect = np.arange(0, Nyp) * Ndown
y = w[downSelect]
if verify:
# Inefficient implementation
# . Upsampling with multiply by zero values
v = np.zeros((Nup, Nx)) # poly phases
v[0] = x
v = v.T.reshape(1, Nup * Nx)[0] # upsample
# . LPF
w = signal.lfilter(coefs, a, v)
# . Downsampling with calculating values that are removed
yVerify = np.zeros(Ndown * Nyp)
yVerify[0 : Ny] = w[0 : Ny]
yVerify = yVerify.reshape(Nyp, Ndown).T # poly phases
yVerify = yVerify[0] # downsample
if np.allclose(y, yVerify, rtol=c_rtol, atol=c_atol):
print('PASSED: correct resample result')
else:
print('PASSED: correct downsample result')
print('ERROR: wrong resample result')
print('Resampling:')
print('. len(x) = ' + str(len(x)))
print('. Nx = ' + str(Nx))
print('. len(v) = ' + str(len(v)))
print('. Ny = ' + str(Ny))
print('. Nyp = ' + str(Nyp))
print('. len(y) = ' + str(len(y)))
return y
......
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment