Skip to content
Snippets Groups Projects
Commit 80d9ed3d authored by Reinier van der Walle's avatar Reinier van der Walle
Browse files

Merge branch 'RTSD-224' into 'master'

Resolve RTSD-224

Closes RTSD-224

See merge request !404
parents a4e59a0f 2a479ce1
Branches
No related tags found
1 merge request!404Resolve RTSD-224
Pipeline #80477 passed
......@@ -30,6 +30,7 @@
import numpy as np
from scipy import signal
import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1 import make_axes_locatable
import dsp_fpga_lib # from [2]
c_interpolate = 10
......@@ -168,6 +169,109 @@ def ideal_low_pass_filter(Npoints, Npass, bandEdgeGain=1.0):
return h, f, HF
def prototype_fir_low_pass_filter(
Npoints=1024, Ntaps=16, Ncoefs=1024*16, hpFactor=0.9, transitionFactor=0.4, stopRippleFactor=1000000, beta=1):
"""Derive FIR coefficients for prototype low pass filter using firls
Default use LPF specification for LOFAR subband filter. For subband filter
BWbin = fs / Npoints and the number of subbands is Nsub = Npoints / 2.
For multirate upsampling or or downsampling filter the Npoints is the
multirate factor Nup in upsampling or Ndown in downsampling. The fpass =
BWbin / 2 = fNyquist / Npoints, where fNyquist = fs / 2.
Use hpFactor < 1.0 to increase attentuation at fpass, to reduce disturbance
due to aliasing.
The number of prototype FIR filter coefficients Ncoefs <= Npoints * Ntaps.
For symmetrical-coefficient FIR filters the group delay is
(Ncoefs - 1) / 2, so choose Ncoefs is odd to have integers number of
samples output delay.
Normalize DC gain to 1.0.
Input:
- Npoints: Number of points of the DFT in the filterbank, or multirate
factor Nup in upsampling or Ndown in downsampling
- Ntaps: Number of taps per polyphase.
- Ncoefs: Actual number of prototype FIR filter coefficients, any extra
coefs are zero
- hpFactor : Half power bandwidth of the filter relative to BWbin
- transitionFactor: transition bandwidth factor relative to fpass
- stopRippleFactor: stopband ripple facotr relative to pass band ripple
- beta: When beta > 0 then apply Kaiser window on FIR coefficients
Return:
- hFirls: FIR coefficients calculated with firls and optional Kaiser window
- hInterpolated: Interpolated FIR coefficients for requested Ncoefs
"""
# LPF specification for subband filter
BWbin = 1 / Npoints # bandwidth of one bin (is subband frequency)
# Use half power bandwidth factor to tune half power cutoff frequency of LPF, default 1.0
BWpass = hpFactor * BWbin
fpass = BWpass / 2 # bin at DC: -fpass to +fpass
# Initial FIR filter length
# . Use interpolation of factor Q shorter filter to ensure the FIR filter design converges
# and to speed up calculation. N >> NFirlsMax = 1024 is not feasible.
# . The passband ripple and stopband attenuation depend on the transition bandwidth w_tb
# and the weight. Choose 0 ~< w_tb ~< 1.0 fpass, to ensure the FIR filter design converges
# and improve the passband ripple and stopband attenuation. A to large transition band
# also gives the design too much freedom and causes artifacts in the transition.
# . scipy firls() defines fpass relative to fs, so can use fpass as cutoff frequency.
NFirlsMax = 1024
Q = ceil_div(Ncoefs, NFirlsMax)
N = Ncoefs // Q
# If necessary -1 to make odd, because firls only supports odd number of FIR coefficients
if is_even(N):
N = N - 1
# Low pass transition band
f_pb = fpass * Q # pass band cut off frequency
w_tb = transitionFactor * fpass * Q # transition bandwidth
f_sb = f_pb + w_tb # stop band frequency
weight = [1, stopRippleFactor] # weight pass band ripple versus stop band ripple
print('f_pb = ', str(f_pb))
print('w_tb = ', str(w_tb))
print('f_sb = ', str(f_sb))
hFirls = signal.firls(N, [0, f_pb, f_sb, 0.5], [1, 1, 0, 0], weight, fs=1.0)
# Apply Kaiser window with beta = 1 like in pfs_coeff_final.m, this improves the
# stopband attenuation near the transition band somewhat
# . beta: 0 rect, 5 hamming, 6 hanning
if beta:
win = signal.windows.kaiser(N, beta=1)
hFirls *= win
# Normalize DC gain
dcSum = np.sum(hFirls)
hFirls *= 1.0 / dcSum
# Symmetrical FIR coeffients for linear phase
print('hFirls:')
print('. f_pb = %f' % f_pb)
print('. w_tb = %f' % w_tb)
print('. f_sb = %f' % f_sb)
print('. beta = %f' % beta)
print('. Q = %d' % Q)
print('. N = %d' % len(hFirls))
print('. DC sum = %f' % np.sum(hFirls))
print('. Symmetrical coefs = %s' % is_symmetrical(hFirls))
if len(hFirls) == Ncoefs:
return hFirls
else:
# Use Fourier interpolation to create final FIR filter coefs when
# Q > 1 or when Ncoefs is even
HFfirls = np.fft.fft(hFirls)
hInterpolated = fourier_interpolate(HFfirls, Ncoefs)
print('hInterpolated:')
print('. Ncoefs = %d' % len(hInterpolated))
print('. DC sum = %f' % np.sum(hInterpolated))
print('. Symmetrical coefs = %s' % is_symmetrical(hInterpolated))
return hInterpolated
def fourier_interpolate(HFfilter, Ncoefs):
"""Use Fourier interpolation to create final FIR filter coefs.
......@@ -218,7 +322,7 @@ def fourier_interpolate(HFfilter, Ncoefs):
if np.allclose(hInterpolated.imag, np.zeros(Ncoefs), rtol=c_rtol, atol=c_atol):
print('hInterpolated.imag ~= 0')
else:
print('WARNING: hInterpolated.imag != 0 (max(abs) = %f)' % np.max(np.abs(hInterpolated.imag)))
print('WARNING: hInterpolated.imag != 0 (max(abs) = %e)' % np.max(np.abs(hInterpolated.imag)))
return hInterpolated.real
......@@ -351,6 +455,457 @@ def dtft(coefs, Ndtft=None, zeroCenter=True, fftShift=True):
return h, f, HF
###############################################################################
# Polyphase filter
###############################################################################
class PolyPhaseFirFilterStructure:
"""Polyphase FIR filter structure (PFS) per block of data
Input:
. Nphases: number of poly phases, is data block size, is number of rows
(axis=0) in polyphase structure
. coefs: FIR coefficients b[0 : Nphases * Ntaps - 1]
. commutator: 'up' or 'down', commutator 'up' selects the polyCoefs from
phase 0 : Nphases - 1, commutator 'down' first flips the phases order
in polyCoefs.
Derived:
. Ntaps: number of taps per poly phase, is number of columns (axis=1) in
the polyphase structure
Map LPF FIR coefs to polyCoefs with:
. flip rows (axis=0) to have first coefs[0] at last row to fit time order
of data block with newest sample at last index. Flipping polyCoefs once
here at init is probably more efficient, then flipping inData and outData
for every block.
. No need to flip the order of the Ntaps columns (axis=1), because the
filter sums the Ntaps.
Filter delay memory polyDelays:
. Shift in data blocks per column at column 0, last sample in data block
and in column is newest.
"""
def __init__(self, Nphases, coefs, commutator):
self.Nphases = Nphases
self.coefs = coefs
self.Ncoefs = len(coefs)
self.Ntaps = ceil_div(self.Ncoefs, Nphases)
# coefs = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
# polyCoefs = [[ 3, 7, 11],
# [ 2, 6, 10],
# [ 1, 5, 9],
# [ 0, 4, 8]]), Nphases = 4 rows (axis=0)
# Ntaps = 3 columns (axis=1)
self.polyCoefs = self.poly_coeffs()
if commutator == 'down':
self.polyCoefs = np.flip(self.polyCoefs, axis=0)
# oldest sample[0]
# newest sample [15]
# samples [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
# blocks [ 0, 1, 2, 3]
# shift blocks to next column in polyDelays
# put newest block[3] in first column of polyDelays
# polyDelays = [[ 8, 4, 0],
# [ 9, 5, 1],
# [10, 6, 2],
# [11, 7, 3]])
self.polyDelays = np.zeros((Nphases, self.Ntaps))
def poly_coeffs(self):
"""Polyphase structure for FIR filter coefficients coefs in Nphases
Input:
. coefs = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
Return:
. polyCoefs = [[ 0, 4, 8], # p = 0
[ 1, 5, 9], # p = 1
[ 2, 6, 10], # p = 2
[ 3, 7, 11]]) # p = 3, Nphases = 4 rows (axis=0)
Ntaps = 3 columns (axis=1)
If Ncoefs < Ntaps * Nphases, then pad polyCoefs with zeros.
"""
Ncoefs = self.Ncoefs
Nphases = self.Nphases
Ntaps = self.Ntaps
polyCoefs = np.zeros(Nphases * Ntaps)
polyCoefs[0:Ncoefs] = self.coefs
polyCoefs = polyCoefs.reshape(Ntaps, Nphases).T
return polyCoefs
def filter_block(self, inData):
"""Filter block of inData per poly phase
Input:
. inData: block of Nphases input data, time n increments, so inData[0] is
oldest data and inData[Nphases-1] is newest data
Return:
. outData: block of Nphases filtered output data with outData[0] is
oldest data and outData[Nphases-1] is newest data, so time n
increments, like with inData
"""
Ntaps = self.Ntaps
# Shift delay memory by one block
self.polyDelays = np.roll(self.polyDelays, 1, axis=1)
# Shift in inData block at column 0
self.polyDelays[:, 0] = inData
# Apply FIR coefs per element
zData = self.polyDelays * self.polyCoefs
# Sum per poly phase
outData = np.sum(zData, axis=1)
return outData
def upsample(x, Nup, coefs, verify=False): # interpolate
"""Upsample x by factor I = Nup
Input:
. x: Input signal x[n]
. Nup: upsample (interpolation) factor
. coefs: FIR filter coefficients for antialiasing LPF
. verify: when True then verify that output y is the same when calculated directly or when calculated using the
polyphase implementation.
Return:
. y: Upsampled output signal y[m]
Assumptions:
. x[n] = 0 for n < 0
. m = n * I, so first upsampled output sample starts at m = 0 based on n = 0
. no y[m] for m < 0
. insert I - 1 zeros after each x[n] to get v[m], so len(v) = len(y) = I * len(x)
. use coefs as anti aliasing filter, must be LPF with BW < fNyquist / I
. len(coefs) typically is multiple of Nup. If shorter, then the coefs are extended with zeros.
Remarks:
. The input sample period is ts and the output sample period of the upsampled (= interpolated signal) is tsUp =
ts / I
. The group delay is (Ncoefs - 1) / 2 * tsUp = (Ncoefs - 1) / I / 2 * ts. With odd Ncoefs and symmetrical coefs
to have linear phase, the group delay is an integer number of tsUp periods, so:
- when (Ncoefs - 1) % 2 == 0, then the group delay is an integer number of tsUp periods
- when (Ncoefs - 1) % (2 * I) == 0, then the group delay is an integer number of ts periods
Procedure:
x[n]: 0 1 2 3 --> time n with unit Ts
v[m] = x[m / I], for m = 0, +-I, +-2I, ...
= 0, else
v[m]: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 --> time m with unit Ts / I
| | | |
h[k]: 11 10 9 8 7 6 5 4 3 2 1 0 | | | --> coefs for m = 12
11 10 9 8 7 6 5 4 3 2 1 0 | | --> coefs for m = 13
11 10 9 8 7 6 5 4 3 2 1 0 | --> coefs for m = 14
11 10 9 8 7 6 5 4 3 2 1 0 --> coefs for m = 15
y[m]: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 --> time m with unit Ts / I
Calculate y[0, 1, 2, 3] at x[0]
y[0] = h[0] x[0]
y[1] = h[1] x[0]
y[2] = h[2] x[0]
y[3] = h[3] x[0]
Calculate y[4, 5 ,6 ,7] at x[1]
y[ 4] = h[0] x[1] + h[4] x[0]
y[ 5] = h[1] x[1] + h[5] x[0]
y[ 6] = h[2] x[1] + h[6] x[0]
y[ 7] = h[3] x[1] + h[7] x[0]
Calculate y[8, 9, 10, 11] at x[2]
y[ 8] = h[0] x[2] + h[4] x[1] + h[ 8] x[0]
y[ 9] = h[1] x[2] + h[5] x[1] + h[ 9] x[0]
y[10] = h[2] x[2] + h[6] x[1] + h[10] x[0]
y[11] = h[3] x[2] + h[7] x[1] + h[11] x[0]
Calculate y[12, 13, 14, 15] at x[3], see '|' markers between v[m] (is zero padded x[n]) and h[k] above
y[12] = h[0] x[3] + h[4] x[2] + h[ 8] x[1]
y[13] = h[1] x[3] + h[5] x[2] + h[ 9] x[1]
y[14] = h[2] x[3] + h[6] x[2] + h[10] x[1]
y[15] = h[3] x[3] + h[7] x[2] + h[11] x[1]
==> Calculate y[n * I + [0, ..., I - 1]] at x[n]
y[n * I + 0] = lfilter(h[0, 4, 8], [1], x) # p = 0
y[n * I + 1] = lfilter(h[1, 5, 9], [1], x) # p = 1
y[n * I + 2] = lfilter(h[2, 6,10], [1], x) # p = 2
y[n * I + 3] = lfilter(h[3, 7,11], [1], x) # p = 3
"""
Nx = len(x)
Ncoefs = len(coefs)
a = [1.0]
# Polyphase implementation to avoid multiply by zero values
# coefs = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
# polyCoefs = [[ 0, 4, 8], # p = 0
# [ 1, 5, 9], # p = 1
# [ 2, 6, 10], # p = 2
# [ 3, 7, 11]]) # p = 3
pfs = PolyPhaseFirFilterStructure(Nup, coefs, commutator='up')
polyCoefs = pfs.polyCoefs
# Poly phases for data
# . Use polyY for whole data signal x with Nx samples, instead of pfs.polyDelays for pfs.Ntaps, to be able to use
# signal.lfilter()
polyY = np.zeros((Nup, Nx))
# Filter x per polyphase, index p = 0, 1, .., Nup - 1 is the commutator in [Figure 10.13, 10.23 in LYONS, 3.25
# HARRIS]
for p in range(Nup):
polyY[p] = signal.lfilter(polyCoefs[p], a, x)
y = polyY.T.reshape(1, Nup * Nx)[0]
if verify:
# Inefficient upsampling implementation with multiply by zero values
xZeros = np.zeros((Nup, Nx))
xZeros[0] = x
xZeros = xZeros.T.reshape(1, Nup * Nx)[0] # upsample
yVerify = signal.lfilter(coefs, a, xZeros) # LPF
if np.allclose(y, yVerify, rtol=c_rtol, atol=c_atol):
print('PASSED: correct upsample result')
else:
print('ERROR: wrong upsample result')
print('Upsampling:')
print('. Nup = ' + str(Nup))
print('. Nx = ' + str(Nx))
print('. len(y) = ' + str(len(y)))
return y
def downsample(x, Ndown, coefs, verify=False): # decimate
"""Downsample x by factor D = Ndown up
Input:
. x: Input signal x[n]
. Ndown: downsample (decimation) factor
. coefs: FIR filter coefficients for antialiasing LPF
. verify: when True then verify that output y is the same when calculated directly or when calculated using the
polyphase implementation.
Return:
. y: Downsampled output signal y[m]
Assumptions:
. x[n] = 0 for n < 0
. m = n // I, so first downsampled output sample starts at m = 0 based on n = 0
. no y[m] for m < 0
. skip D - 1 values zeros after each x[n] to get y[m], so len(y) = len(x) // D
. use coefs as anti aliasing filter, must be LPF with BW < fNyquist / D
. len(coefs) typically is multiple of Ndown. If shorter, then the coefs are extended with zeros.
Remarks:
. The input sample period is ts and the output sample period of the downsampled (= decimated signal) is tsDown =
ts * D
. The group delay is (Ncoefs - 1) / 2 * ts. With odd Ncoefs and symmetrical coefs to have linear phase, the group
delay is an integer number of ts periods, so:
- when (Ncoefs - 1) % 2 == 0, then the group delay is an integer number of ts periods
- when (Ncoefs - 1) % (2 * D) == 0, then the group delay is an integer number of tsDown periods
Procedure:
x[n]: 0 1 2 3 4 5 6 7 8 9 10 11 12 --> time n with unit Ts
|
h[k]: 11 10 9 8 7 6 5 4 3 2 1 0 --> coefs for y[m] at m = 3
8 4 0 --> coefs for v[n] at n = 12
9 5 1 --> coefs for v[n] at n = 11
10 6 2 --> coefs for v[n] at n = 10
11 7 3 --> coefs for v[n] at n = 9
v[n]: 0 1 2 3 4 5 6 7 8 9 10 11 12
y[m]: 0 1 2 3 --> time m with unit Ts * D
Remove v[-3] = h[3] x[-3] at x[-3]
Remove v[-2] = h[2] x[-2] at x[-2]
Remove v[-1] = h[1] x[-1] at x[-1]
Calculate v[ 0] = h[0] x[ 0] at x[ 0]
Calculate v[ 1] = h[3] x[ 1] at x[ 1]
Calculate v[ 2] = h[2] x[ 2] at x[ 2]
Calculate v[ 3] = h[1] x[ 3] at x[ 3]
Calculate v[ 4] = h[0] x[ 4] + h[4] x[0] at x[ 4]
Calculate v[ 5] = h[3] x[ 5] + h[7] x[1] at x[ 5]
Calculate v[ 6] = h[2] x[ 6] + h[6] x[2] at x[ 6]
Calculate v[ 7] = h[1] x[ 7] + h[5] x[3] at x[ 7]
Calculate v[ 8] = h[0] x[ 8] + h[4] x[4] + h[ 8] x[0] at x[ 8]
Calculate v[ 9] = h[3] x[ 9] + h[7] x[5] + h[11] x[1] at x[ 9]
Calculate v[10] = h[2] x[10] + h[6] x[6] + h[10] x[2] at x[10]
Calculate v[11] = h[1] x[11] + h[5] x[7] + h[ 9] x[3] at x[11]
Calculate v[12] = h[0] x[12] + h[4] x[8] + h[ 8] x[4] at x[12]
==> Calculate y[m] at sum v[n - p] at x[n] for n = m * D and p = 3, 2, 1, 0
Ndown-1
y[m] = sum v[m * D - p]
p = 0
with:
[n - p] [p]
v[n - 3] = lfilter(h[3, 7,11], [1], polyX[0])
v[n - 2] = lfilter(h[2, 6,10], [1], polyX[1])
v[n - 1] = lfilter(h[1, 5, 9], [1], polyX[2])
v[n - 0] = lfilter(h[0, 4, 8], [1], polyX[3])
"""
# Number of samples from x per poly phase in polyX, prepended with Ndown - 1 zeros. Prepend Ndown - 1 zeros
# to have first down sampled sample at m = 0 start at n = 0.
Nxp = (Ndown - 1 + len(x)) // Ndown
# Used number of samples from x
Nx = 1 + Ndown * (Nxp - 1)
Ncoefs = len(coefs)
a = [1.0]
# Polyphase implementation to avoid calculating values that are removed
# coefs = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
# polyCoefs = [[ 3, 7, 11],
# [ 2, 6, 10],
# [ 1, 5, 9],
# [ 0, 4, 8]])
pfs = PolyPhaseFirFilterStructure(Ndown, coefs, commutator='down')
polyCoefs = pfs.polyCoefs
# Poly phases for data
# . Use polyX for whole data signal x with Nx samples, instead of pfs.polyDelays for pfs.Ntaps, to be able to use
# signal.lfilter() and to be able to prepend Ndown - 1 zeros
polyX = np.zeros(Ndown * Nxp)
polyX[Ndown - 1] = x[0] # prepend x with Ndown - 1 zeros
polyX[Ndown:] = x[1 : Nx]
polyX = polyX.reshape(Nxp, Ndown).T
polyY = np.zeros((Ndown, Nxp))
# Filter x per polyphase, index p = 0, 1, .., Ndown - 1 is the commutator in [Figure 10.14, 10.22 in LYONS, 3.29
# HARRIS]
for p in range(Ndown):
polyY[p] = signal.lfilter(polyCoefs[p], a, polyX[p])
y = np.sum(polyY, axis=0)
if verify:
# Inefficient down sampling implementation with calculating values that are removed
yVerify = np.zeros(Ndown * Nxp)
yVerify[0 : Nx] = signal.lfilter(coefs, a, x[0 : Nx]) # LPF
yVerify = yVerify.reshape(Nxp, Ndown).T # poly phases
yVerify = yVerify[0] # downsample
if np.allclose(y, yVerify, rtol=c_rtol, atol=c_atol):
print('PASSED: correct downsample result')
else:
print('ERROR: wrong downsample result')
print('Downsampling:')
print('. len(x) = ' + str(len(x)))
print('. Ndown = ' + str(Ndown))
print('. Nx = ' + str(Nx))
print('. Nxp = ' + str(Nxp))
print('. len(y) = ' + str(len(y))) # = Nxp
return y
def resample(x, Nup, Ndown, coefs, verify=False): # interpolate and decimate by Nup / Ndown
"""Resample x by factor I / D = Nup / Ndown
x[n] --> Nup --> v[m] --> LPF --> w[m] --> Ndown --> y[k]
The w[m] is the result of upsampling x[n] by Nup. The LPF has narrowed the pass band to suit both Nup and Ndown.
Therefore the downsampling by Ndown implies that only one in every Ndown samples of w[m] needs to be calculated
to get y[k] = w[k * D]. Hence resampling by Nup / Ndown takes less processing than upsampling when Ndown > 1.
Poly phases in up and down sampling:
The upsampling structure outputs the interpolated samples via the separate output phases. The same input sequence
x is used to calculate all Nup phases, whereby each phase uses a different set of coefficients from the LPF.
The downsampling structure sums the Ndown phases to output the decimated samples. Each phase uses a different set
of coefficients from the LPF to filter Ndown delay phases of the input sequence x.
Resampling is upsampling with down sampling by phase selection
The resampling is done by first upsampling and then downsampling, because then only one shareed LPF is needed.
For upsampling an LPF is always needed, because it has to construct the inserted Nup - 1 zero values. For
downsampling the LPF of the upsampling already has restricted the input band width (BW), provided that the LPF
has BW < fNyquist / I and BW < fNyquist / D. The downsampling therefore does not need an LPF and reduces to
selecting appropriate output phase of the upsampler. The upsampler then only needs to calculate the output phase
that will be used by the downsampler, instead of calculating all output phases.
Input:
. x: Input signal x[n]
. Nup: upsample (interpolation) factor
. Ndown: downsample (decimation) factor
. coefs: FIR filter coefficients for antialiasing LPF
. verify: when True then verify that output y is the same when calculated directly or when calculated using the
polyphase implementation.
Return:
. y: Resampled output signal y[m]
Assumptions:
. x[n] = 0 for n < 0
. k = m // D = (n * I) // D, so first resampled output sample starts at k = 0 based on n = 0
. no y[k] for k < 0
. insert I - 1 zeros after each x[n] to get v[m], so len(v) = len(y) = I * len(x)
. use coefs as anti aliasing filter, must be LPF with BW < fNyquist / I and BW < fNyquist / D
. len(coefs) typically is multiple of Nup. If shorter, then the coefs are extended with zeros.
Remarks:
. The group delay is the same as for the upsampler, because the downsampling has no additional filtering.
"""
Nx = len(x)
Nv = Nup * Nx
Ncoefs = len(coefs)
a = [1.0]
Nyp = (Nv + Ndown - 1) // Ndown # Number of samples from v, per poly phase in polyY
Ny = 1 + Ndown * (Nyp - 1) # Used number of samples from v
# Upsampling with LPF
w = upsample(x, Nup, coefs, verify=False)
# Downsampling selector
# This model is not efificent, because the upsample() has calculated all Nup phases, whereas it only needs to
# calculate the phase that is selected by the downsampling. However, it is considered not worth the effort to
# make the model more efficient, because then only parts of lfilter() in upsample() need to be calulated, so
# then lfilter() needs to be implemented manually and possibly in a for loop {section 3.3.4 HARRIS].
downSelect = np.arange(0, Nyp) * Ndown
y = w[downSelect]
if verify:
# Inefficient implementation
# . Upsampling with multiply by zero values
v = np.zeros((Nup, Nx)) # poly phases
v[0] = x
v = v.T.reshape(1, Nup * Nx)[0] # upsample
# . LPF
w = signal.lfilter(coefs, a, v)
# . Downsampling with calculating values that are removed
yVerify = np.zeros(Ndown * Nyp)
yVerify[0 : Ny] = w[0 : Ny]
yVerify = yVerify.reshape(Nyp, Ndown).T # poly phases
yVerify = yVerify[0] # downsample
if np.allclose(y, yVerify, rtol=c_rtol, atol=c_atol):
print('PASSED: correct resample result')
else:
print('ERROR: wrong resample result')
print('Resampling:')
print('. len(x) = ' + str(len(x)))
print('. Nx = ' + str(Nx))
print('. len(v) = ' + str(len(v)))
print('. Ny = ' + str(Ny))
print('. Nyp = ' + str(Nyp))
print('. len(y) = ' + str(len(y)))
return y
###############################################################################
# Radix FFT
###############################################################################
# def radix_fft():
# """Calculate DFT using smaller DFTs
#
# Use DFTs with radices equal to the greatest common dividers (gcd) of the
# input DFT size.
#
# https://stackoverflow.com/questions/9940192/how-to-calculate-a-large-size-fft-using-smaller-sized-ffts
# """
###############################################################################
# Plotting
###############################################################################
......@@ -564,3 +1119,16 @@ def plot_two_power_spectra(f1, HF1, name1, f2, HF2, name2, fs=1.0, fLim=None, db
if dbLim:
plt.ylim(dbLim)
plt.grid(True)
def plot_colorbar(plt):
# Work around using make_axes_locatable() to avoid:
# /tmp/ipykernel_9570/3561080291.py:12: MatplotlibDeprecationWarning: Auto-removal of grids by
# pcolor() and pcolormesh() is deprecated since 3.5 and will be removed two minor releases
# later; please call grid(False) first.
# Using plt.grid(False) is not sufficient to avoid the warning.
ax = plt.gca()
divider = make_axes_locatable(ax)
cax = divider.append_axes("right", size='5%', pad=0.1)
cax.grid(False)
plt.colorbar(cax=cax)
......@@ -210,9 +210,16 @@
y[n] = b[0]x[n] + b[1]x[n-1] + ... + b[N-1] x[n-(N-1)]
- Unit delay y[n] = x[n-1] --> Y(z) = z^-1 X(z). Therefore FIR z-transform
of difference equation is [LYONS 6.4.1, JOS2 6.5, HARRIS 6.3]:
Y(z) M
H(z) = ---- = sum bk z^-k = b0 + b1 z^-1 + b2 z^-2 + ... + bM z^-M
X(z) k=0
- Convolution in time domain is equivalent to multiplication in frequency
domain:
y(n) = h(k) * x(n) ==> DFT ==> Y(m) = H(m) X(m)
y(n) = h(k) * x(n) ==> DFT ==> Y(z) = H(z) X(z)
For DFT this is circular convolution. With suffcient zero padding N >= len(the
circular convolution can calculate the linear convolution:
......@@ -445,20 +452,25 @@
with polar form z = r exp(j w), w = -pi to +pi, w = 0 at z = 1 + 0j.
- Unit delay y[n] = x[n-1] --> Y(z) = z^-1 X(z). Therefore IIR z-transform
of difference equation is [LYONS 6.4.1 uses -ak, JOS2 6.5 uses +ak in H(z)]:
of difference equation is [LYONS 6.4.1 uses -ak, JOS2 6.5 and
scipy.signal.freqz use +ak in H(z)]:
Y(z) M N
H(z) = ---- = sum bk z^-k / (a0 + sum ak z^-k)
X(z) k=0 k=1
. scipy.signal.freqz plots frequency response along unit circle
z = exp(j w) defines ak for k > 1 as negative
b0 + b1 z^-1 + b2 z^-2 + ... + bM z^-M
= --------------------------------------
a0 + a1 z^-1 + a2 z^-2 + ... + aN z^-N
. scipy.signal.freqz plots frequency response of H(z) along unit circle
z = exp(j w) defines ak for k > 0 as negative to have +ak in H(z)
. analytic continuation means if H0(z) = H1(z) for z = exp(jw), then
H0(z) = H1(z) for all z [VAIDYANATHAN 2.4.3].
- biquad (= second-order section = sos):
Y(z) b0 + b1 z^-1 + b1 z^-2 (z - z0)(z - z1)
Y(z) b0 + b1 z^-1 + b2 z^-2 (z - z0)(z - z1)
H(z) = ---- = ---------------------- = K ----------------
X(z) a0 + a1 z^-1 + a2 z^-2 (p - p0)(p - p1)
......@@ -587,6 +599,7 @@
W_N = exp(-j 2pi / N) is primitive Nth root of unity
W_N^k = exp(-j 2pi k / N)
W_N^kn = exp(-j 2pi k / N * n) = exp(-j w_k * t_n)
. N is number of time samples and number of frequency point
. w_k = k 2pi fs / N
. t_n = n Ts
. fs / N is the frequency sampling interval in Hz
......@@ -635,6 +648,11 @@
X(w_k) = X(k) = sum x(n) W_N^kn
n=0 exp(-j w_k t_n)
exp(-j 2pi k n / N)
with:
. N is number of time samples and number of frequency point
. w_k = k 2pi fs / N
. t_n = n Ts
Inverse DFT:
N-1
......@@ -643,6 +661,9 @@
with:
. s_k(n) = exp(+j w_k t_n)
The DFT uses exp(-j w) and the IDFT uses exp(+j w), so applying IDFT on x(n)
will also result in a frequency domain representation.
- Matrix formulation, DFT as linear transformation [JOS1, PROAKIS 5.1.3]:
DFT:
......@@ -651,9 +672,13 @@
|X(0) | |1 1 1 ... 1 | |x(0) |
|X(1) | |1 W_N W_N^2 ... W_N^(N-1) | |x(1) |
|X(2) | = |1 W_N^2 W_N^4 ... W_N^2(N-1) | |x(2) |
|X(3) | = |1 W_N^3 W_N^6 ... W_N^3(N-1) | |x(3) |
|... | |... ... | |... |
|X(N-1)| |1 W_N^(N-1) W_N^2(N-1) ... W_N^(N-1)(N-1)| |x(N-1)|
DFT matrix is Square matrix, because N is number of time samples in x(n) and
number of frequency points in X(k).
IDFT:
xN = WN^-1 XN = 1/N conj(WN) XN, so
......@@ -812,6 +837,23 @@ LPF + downsampling = decimation:
- Do not calculate samples that will be thrown away.
- Discarding samples folds the spectrum, first the LPF has to remove all
folds.
- Sequence w(m) is an upsampled-by-D version of sequence x(n), and sequence
x(n) is a downsampled-by-D version of sequence w(m) [LYONS 10.9].
. Downsampling: W(z) = X(z^D)
w(m) = x(m / D), when m is multiple of D else 0
+inf +inf +inf
W(z) = sum w(m) z^-m = sum w(Dk) z^-Dk = sum x(k) z^-Dk = X(z^D)
m=-inf k=-inf k=-inf
. Upsampling: W(z^1/D) = X(z)
w(u) = x(m), when u is Dm else 0
+inf +inf +inf
W(z) = sum w(u) z^-u = sum w(Dm) z^-Dm = sum x(m) z^-Dm = X(z^D)
u=-inf m=-inf m=-inf
Upsampling + LPF = interpolation:
- Do not calculate samples that will be inserted as zeros.
......
Source diff could not be displayed: it is too large. Options to address this: view the blob.
%% Cell type:markdown id:6e0a005d tags:
 
# Try Hilbert transform application
 
Author: Eric Kooistra, dec 2023
 
Purpose:
* Practise DSP [1].
 
The HT is used to determine the envelope and frequency of an amplitude modulated (AM) signal, similar as in scpy.signal.hilbert example. With square (= step) or impulse wave AM there occurs ringing at the transition with same shape as the impule response of the HT. The ringing is not due to the chirp, because it also occurs with a sin wave carrier.
 
References:
 
1. dsp_study_erko, summary of DSP books
 
%% Cell type:code id:3563bc63 tags:
 
``` python
import numpy as np
import matplotlib.pyplot as plt
from scipy import signal
```
 
%% Cell type:code id:f820b0ac tags:
 
``` python
import dsp
```
 
%% Cell type:code id:a131b5b6 tags:
 
``` python
import importlib
importlib.reload(dsp)
```
 
%% Output
 
<module 'dsp' from '/dop466_0/kooistra/git/hdl/applications/lofar2/model/pfb_os/dsp.py'>
 
%% Cell type:code id:e38f8974 tags:
 
``` python
# Fixed constants:
fadc = 200 # MHz LOFAR
fnyquist = fadc / 2
fcarrier = 50
 
# User parameters:
# . General
Ntaps = 63
duration = 4
amPeriods = 3
 
# . Carrier wave
carrier = 'sin'
carrier = 'chirp'
 
# . Amplitude modulation
modulation = 'sin'
#modulation = 'step'
#modulation = 'impulse'
 
# HT design method
method = 'time domain'
method = 'complex BPF'
 
# . Complex BPF bandwidth
halfband = True
halfband = False
```
 
%% Cell type:code id:9b0f2098 tags:
 
``` python
# Create a chirp of which the frequency increases from 20 Hz to 80 Hz and apply an amplitude modulation.
Nsamples = int(fadc * duration)
t = np.arange(Nsamples) / fadc
if carrier == 'chirp':
x = signal.chirp(t, 0.1 * fnyquist, t[-1], 0.9 * fnyquist)
else:
x = np.sin(2.0 * np.pi * fcarrier * t)
sin = np.sin(2.0 * np.pi * amPeriods / duration * t)
if modulation == 'sin':
am = 1.0 + 0.5 * sin # sine wave amlitude modulation
elif modulation == 'step':
am = 1.0 + 0.5 * dsp.one_bit_quantizer(sin) # square wave amlitude modulation
else:
am = 1.0 + 5 * dsp.impulse_at_zero_crossing(sin) # impulse amplitude modulation
x *= am
 
plt.plot(am)
```
 
%% Output
 
[<matplotlib.lines.Line2D at 0x7f161473cbb0>]
[<matplotlib.lines.Line2D at 0x7f4f81cdfd60>]
 
 
%% Cell type:code id:f9550ac3 tags:
 
``` python
D = (Ntaps - 1) / 2
 
# Create Hilbert Transform filter
if method == 'time domain':
htReal = dsp.hilbert_delay(Ntaps)
htImag = dsp.hilbert_response(Ntaps)
htComplex = htReal + 1j * htImag
else: # 'complex BPF'
# Start with LPF
print('LPF remez')
if halfband:
fcutoff = fadc / 4 # half band filter
print('LPF fcutoff = %.1f MHz(= halfband)' % fcutoff)
else:
fcutoff = fadc / 9
print('LPF fcutoff = %.1f MHz' % fcutoff)
w_tb = fcutoff / 5 # transition bandwidth
f_pb = fcutoff - w_tb / 2
f_sb = fcutoff + w_tb / 2
weight = [1, 1]
print('. w_tb = %.1f MHz' % w_tb)
print('. f_pb = %.1f MHz' % f_pb)
print('. f_sb = %.1f MHz' % f_sb)
print('. weight = %s' % weight)
lpCoefs = signal.remez(Ntaps, [0, f_pb, f_sb, fadc / 2], [1, 0], weight, fs=fadc)
# Create with complex BPF
fcenter = fadc / 4
k = np.arange(0, Ntaps)
hfCoefs = lpCoefs * np.exp(1j * 2 * np.pi * fcenter / fadc * (k - D))
htComplex = 2 * hfCoefs # scale factor for missing negative frequency
 
b = htComplex
a = np.array([1.0])
 
# Plot HT impulse response
plt.figure(figsize=[16, 8])
dsp.plot_time_response(b.imag, name='b.imag', markers=True)
```
 
%% Output
 
LPF remez
LPF fcutoff = 22.2 MHz
. w_tb = 4.4 MHz
. f_pb = 20.0 MHz
. f_sb = 24.4 MHz
. weight = [1, 1]
 
 
%% Cell type:code id:ac01757c tags:
 
``` python
# Apply HT filter
y_analytic = signal.lfilter(b, a, x)
 
y_envelope = np.abs(y_analytic)
y_phase = np.unwrap(np.angle(y_analytic))
y_frequency = (np.diff(y_phase) / (2.0 * np.pi) * fadc)
 
t_delay = D / fadc
 
# Plot
plt.figure(figsize=[16, 8])
plt.title('Envelope of %s with %s modulation' % (carrier, modulation))
plt.plot(t, x, 'b', t - t_delay, y_envelope, 'r', t - t_delay, -y_envelope, 'g')
plt.grid(True)
plt.savefig('envelope.jpg')
#plt.xlim([2, 3])
 
# Frequency of chirp
# . frequency is positive if FIR coefficients are applied in correct order, so b = htComplex and
# not b = np.flip(htComplex)
plt.figure(figsize=[16, 8])
plt.title('Frequency of %s with %s modulation' % (carrier, modulation))
plt.plot(t[1:] - t_delay, y_frequency)
plt.grid(True)
#plt.xlim([2, 3])
```
 
%% Output
 
 
 
%% Cell type:code id:011fa051 tags:
 
``` python
```
......
Source diff could not be displayed: it is too large. Options to address this: view the blob.
Source diff could not be displayed: it is too large. Options to address this: view the blob.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment