Skip to content
Snippets Groups Projects
Commit d2140911 authored by Eric Kooistra's avatar Eric Kooistra
Browse files

Put pfb_os and rtdsp in new separate repository rtdsp.

parent 189cebdb
No related branches found
No related tags found
No related merge requests found
Pipeline #102010 passed with warnings
#! /usr/bin/env python3
###############################################################################
#
# Copyright 2024
# ASTRON (Netherlands Institute for Radio Astronomy) <http://www.astron.nl/>
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###############################################################################
# Author: Eric Kooistra
# Purpose: Multi channel analysis filterbank and multi channel synthesis
# filterbank, using DFT and IDFT
# Description:
#
# Usage:
# . Use [2] to verify this code.
#
# References:
# [1] dsp_study_erko.txt
# [2] pfb_os/multirate_mixer.ipynb
#
# Books:
# . LYONS
# . HARRIS
# . CROCHIERE
import numpy as np
from sys import exit
from .multirate import fold, \
PolyPhaseFirFilterStructure, \
polyphase_data_for_downsampling_whole_x
def analysis_dft_filterbank(x, Ndown, Ndft, coefs, structure, commutator=None, verbosity=1):
"""Analysis DFT filterbank with Ros = Ndft / Ndown.
Implements WOLA structure for DFT filterbank [CROCHIERE Fig 7.19]. Key
steps:
- Signal x has time index n. Mixer local oscillator (LO) and LPF and
downsampler M, yields the short-time spectrum of signal x at time n = mM,
so M is the block size or downsample rate [CROCHIERE Eq 7.65 = Eq 7.9,
HARRIS Eq 6.1]
- Change from fixed signal time frame n and sliding filter, to sliding time
frame r with fixed filter. This yields factor W_K^(kmM) [CROCHIERE Eq
7.69], with K = Ndft.
- Assume filter h length is Ncoefs = Ntaps * K, for K frequency bins. The
h[-r] weights the signal at n = nM, so the DFT of the weighted signal has
Ncoefs bins k', but for the filterbank only bins k' = k Ntaps are needed.
This pruned DFT can be calculated by stacking the blocks of K samples of
the weighted signal and then calculating the K point DFT [CROCHIERE Eq
7.73, Fig 7.19, BUNTON]. The stacking sum is like polyphase FIR with K
branches, and data is shifted in per block of M samples from x.
- The time (m) and bin (k) dependend phase shift W_K^(kmM) =
exp(-j 2pi k m M / K) of the DFT output is equivalent to a circular time
shift by l = (m M) % K samples of the DFT input. Circular time shift DFT
theorem [CROCHIERE Fig 7.21]:
x[n] <= DFT => X(k), then
x[(n - l) % K] <= DFT => X(k) exp(-j 2pi k l / K)
- Note: fold = roll -1 then flip = flip then roll +1
Input:
. x: Input signal x[n]
. Ndown: downsample factor and input block size
. Ndft: DFT size, number of polyphases in PFS FIR filter
. coefs: prototype LPF FIR filter coefficients for anti aliasing BPF
. structure: 'wola' or 'pfs'
. commutator: only for 'pfs', counter clockwise 'ccw' to use IDFT or
clockwise 'cw' to use DFT. Both yield identical result Yc, because
IDFT(x) = 1/Ndft * DFT(fold(x)).
- verbosity: when > 0 print() status, else no print()
Return:
. Yc: Downsampled and downconverted output signal Yc[k, m], m = n // M for
all Ndft bins k. Complex baseband signals.
"""
# Oversampling analysis DFT filterbank
if structure == 'wola':
# >>> Use WOLA structure [CROCHIERE Fig 7.19]
# Apply the coefs as a window conform [CROCHIERE Eq 7.70].
# For the pfs the coefs are already in the correct order
pfs = PolyPhaseFirFilterStructure(Ndft, coefs)
# For the delay line the coefs still need to be flipped. This flip is
# omitted in reconstruct.m
wCoefs = np.flip(coefs)
# Get parameters
Ncoefs = pfs.Ncoefs
Ntaps = pfs.Ntaps
Ndelays = pfs.Ndelays
# Determine Nblocks of Ndown samples in x
_, _, _, Nblocks = polyphase_data_for_downsampling_whole_x(x, Ndown, Nzeros=0)
# Prepend x with Ncoefs - 1 zeros to have first down sampled sample at
# m = 0 start at n = 0, and to fit Nblocks for WOLA
Nzeros = pfs.Ncoefs - 1
xData = np.concatenate((np.zeros(Nzeros), x))
# Prepare output
Yc = np.zeros((Ndft, Nblocks), dtype='cfloat')
# Need time shift offset to align with LO in single channel reference.
# This tOffset = 1 together with the flip of pfsData makes a fold(),
# so that the use of DFT in WOLA analysis is equivalent to using
# IDFT as in LO downconverter and PFS analysis.
tOffset = 1
# PFB loop per Ndown input xData samples
for b in range(Nblocks):
# Time shift
tShift = b * Ndown
# Apply window
tRange = np.arange(Ncoefs) + tShift
tData = xData[tRange] # data for whole window
# 1) Variant using PFS
# Shifting in the (old and new) data for the whole window is
# equivalent to shifting in only the new data
if 0:
# load whole window data
pfs.shift_in_data(np.flip(tData))
else:
# shift in new data block
tBlock = tData[Ncoefs - Ndown:]
pfs.shift_in_data(np.flip(tBlock))
zPoly = pfs.polyDelays * pfs.polyCoefs
# Sum Ntaps
pfsData = np.sum(zPoly, axis=1)
# Flip polyphases 0:Ndft to match time order for DFT input
pfsData = np.flip(pfsData)
# 2) Variant using a delay line like in reconstruct.m
dLine = np.zeros(Ndelays, dtype='float')
# Need to load data at end of delay line to sum the taps correctly
# in case Ncoefs < Ndelays
dLine[Ndelays - Ncoefs : Ndelays] = tData * wCoefs
# Sum Ntaps
sumLine = np.zeros(Ndft, dtype='float')
for t in range(Ntaps):
sumLine += dLine[np.arange(Ndft) + t * Ndft]
# Verify that 1) PFS and 2) delay line yield same result
# . The two variants differ in structure, but yield same result.
if not np.allclose(sumLine, pfsData):
exit('ERROR: wrong analysis WOLA')
# Fixed time reference, roll is modulo Ndft
pfsShifted = np.roll(pfsData, tOffset + tShift)
# DFT
Yc[:, b] = np.fft.fft(pfsShifted)
elif structure == 'pfs':
# PFS with Ndft polyphases and shift in xBlocks of Ndown samples
pfs = PolyPhaseFirFilterStructure(Ndft, coefs)
# Prepend x with Ndown - 1 zeros, and represent x in Nblocks of Ndown
# samples
Nzeros = Ndown - 1
xBlocks, xData, Nx, Nblocks = polyphase_data_for_downsampling_whole_x(x, Ndown, Nzeros)
# Prepare output
Yc = np.zeros((Ndft, Nblocks), dtype='cfloat')
# PFB loop per Ndown input xData samples
for b in range(Nblocks):
# Filter block, the inData blocks are already flipped in
# polyphase_data_for_downsampling_whole_x()
inData = xBlocks[:, b]
pfsData = pfs.filter_block(inData)
# Time (m) and bin (k) dependend phase shift W_K^(kmM) by circular
# time shift of DFT input
tShift = b * Ndown
pfsShifted = np.roll(pfsData, -tShift) # roll is modulo Ndft
# For 'ccw' apply IDFT, for 'cw' apply DFT(fold()), due to fold()
# both yield identical result
if commutator == 'cw': # DFT
# For 'cw' fold the pfsData to apply the DFT
pfsShifted = np.roll(pfsShifted, -1)
pfsShifted = np.flip(pfsShifted)
Yc[:, b] = np.fft.fft(pfsShifted)
elif commutator == 'ccw': # IDFT
# With 'ccw' keep the pfsData to apply IDFT
Yc[:, b] = np.fft.ifft(pfsShifted) * Ndft
else:
exit('ERROR: invalid commutator ' + str(commutator))
else:
exit('ERROR: invalid structure ' + str(structure))
if verbosity:
Ros = Ndft / Ndown
print('> analysis_dft_filterbank():')
print(' . len(x) =', str(len(x)))
print(' . Nblocks =', str(Nblocks))
print(' . Ros =', str(Ros))
print(' . Ndown =', str(Ndown))
print(' . Ndft =', str(Ndft))
print(' . structure =', structure)
if structure == 'pfs':
print(' . commutator =', commutator)
print('')
return Yc
def synthesis_dft_filterbank(Xbase, Nup, Ndft, coefs, structure, commutator=None, verbosity=1):
"""Synthesis DFT filterbank with Ros = Ndft / Nup.
Implements WOLA structure for DFT filterbank [CROCHIERE Fig 7.20]. Key
steps:
- Signal Xbase has time index m and Ndft values.
Input:
. Xbase: Complex baseband signals for Ndft bins, and Nblocks in time m.
. Nup: upsample factor and output block size
. Ndft: DFT size, number of polyphases in PFS FIR filter
. coefs: prototype LPF FIR filter coefficients for anti aliasing and
interpolating BPF
. structure: 'wola'
. commutator: 'cw' to use DFT or 'ccw' to use IDFT
- verbosity: when > 0 print() status, else no print()
Return:
. y: Upsampled and upconverted output signal y[n].
"""
Ros = Ndft / Nup
# Xbase has Ndft rows and Nblocks columns
Nblocks = np.size(Xbase, axis=1)
# Prepare output
Noutput = Nup * Nblocks + len(coefs)
y = np.zeros(Noutput, dtype='float')
# Adjust LO phase for group delay of analysis LPF and synthesis LPF in
# series. This is not needed in practise, but is needed to be able to
# exactly compare reconstructed signal with original input time series
# signal. Assume analysis coefs and synthesis coefs have linear phase and
# same length (but not necessarily same coefs), then the total group
# delay is (len(coefs) - 1) / 2 + (len(coefs) - 1) / 2.
hPairGroupDelay = len(coefs) - 1
# Oversampling synthesis DFT filterbank
if structure == 'wola':
# >>> Use WOLA structure [CROCHIERE Fig 7.20]
# Apply the coefs as a window conform [CROCHIERE Eq 7.76].
# Use PFS with Ndft polyphases for delay line and coefs window
pfs = PolyPhaseFirFilterStructure(Ndft, coefs)
# Use coefs as window
wCoefs = coefs
# Get parameters
Ncoefs = pfs.Ncoefs
Ntaps = pfs.Ntaps
Ndelays = pfs.Ndelays # zero padded coefs
# PFB loop per Nup output y samples
for b in range(Nblocks):
# For 'ccw' apply IDFT, for 'cw' apply DFT(fold()), due to fold()
# both # yield identical result
if commutator == 'cw': # DFT
# For 'cw' fold the Xbase to apply the DFT
xTime = np.real(np.fft.fft(fold(Xbase[:, b])))
else: # 'ccw', IDFT
# With 'ccw' keep the Xbase to apply IDFT
xTime = Ndft * np.real(np.fft.ifft(Xbase[:, b]))
# Apply filter group delay (hPairGroupDelay), and time (m) and bin
# (k) dependend phase shift W_K^(kmM) by circular time shift of
# DFT output. To get from fixed time reference to sliding time
# reference.
tShift = b * Nup - hPairGroupDelay
xShifted = np.roll(xTime, -tShift) # roll is modulo Ndft
# 1) Variant using PFS.
# Load xTime at Ntaps in polyDelays
pfs.parallel_load(xShifted)
# Apply FIR coefs per delay element
zPoly = Nup * pfs.polyDelays * pfs.polyCoefs
zLine = pfs.map_to_delay_line(zPoly)
zLine = zLine[0:Ncoefs]
# 2) Variant using a delay line like in reconstruct.m
# Copy data at Ntaps
dLine = np.zeros(Ndelays, dtype='float')
for t in range(Ntaps):
dLine[np.arange(Ndft) + t * Ndft] = xShifted
# Apply window
yLine = np.zeros(Ncoefs, dtype='float')
yLine = Nup * dLine[0:Ncoefs] * wCoefs
# Verify that 1) PFS and 2) delay line yield same result
# . Both structures are in fact identical, because both use
# parallel load of the xShifted data.
if not np.allclose(zLine, yLine):
exit('ERROR: wrong sythesis WOLA')
# Overlap add weigthed input to the output
tRange = np.arange(pfs.Ncoefs) + b * Nup
y[tRange] += zLine
else:
# No 'pfs' for fractional oversampled synthesis, because the
# interpolator cannot be separated in independent branches
# then. Expect when Ros is integer [CROCHIERE 7.2.4].
exit('ERROR: invalid structure ' + str(structure))
if verbosity:
print('> synthesis_dft_filterbank():')
print(' . Nblocks =', str(Nblocks))
print(' . Ros =', str(Ros))
print(' . Nup =', str(Nup))
print(' . Ndft =', str(Ndft))
print(' . structure =', structure)
print(' . commutator =', commutator)
print('')
return y
This diff is collapsed.
#! /usr/bin/env python3
###############################################################################
#
# Copyright 2024
# ASTRON (Netherlands Institute for Radio Astronomy) <http://www.astron.nl/>
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###############################################################################
# Author: Eric Kooistra
# Purpose: Fourier transform functions for DSP
# Description:
#
# References:
# [1] dsp_study_erko.txt
import numpy as np
from .utilities import c_rtol, c_atol, ceil_pow2, is_even
###############################################################################
# Time domain interpolation using DFT
###############################################################################
def fourier_interpolate(HFfilter, Ncoefs, verbosity=1):
"""Use Fourier interpolation to create final FIR filter coefs.
HF contains filter transfer function for N points, in order 0 to fs. The
interpolation inserts Ncoefs - N zeros and then performs IFFT to get the
interpolated impulse response.
Use phase correction dependent on interpolation factor Q to have fractional
time shift of hInterpolated, to make it symmetrical. Similar as done in
pfs_coeff_final.m and pfir_coeff.m. Use upper = conj(lower), because that
is easier than using upper from HFfilter.
Reference: LYONS 13.27, 13.28
"""
N = len(HFfilter)
K = N // 2
# Interpolation factor Q can be fractional, for example:
# - Ncoefs = 1024 * 16
# . firls: N = 1024 + 1 --> Q = 15.98439
# . remez: N = 1024 --> Q = 16
Q = Ncoefs / N
# Apply phase correction (= time shift) to make the impulse response
# exactly symmetric
f = np.arange(N) / Ncoefs
p = np.exp(-1j * np.pi * (Q - 1) * f)
HF = HFfilter * p
HFextended = np.zeros(Ncoefs, dtype=np.complex_)
if is_even(N):
# Copy DC and K - 1 positive frequencies in lower part (K values)
HFextended[0:K] = HF[0:K] # starts with DC at [0]
# Create the upper part of the spectrum from the K - 1 positive
# frequencies and fs/2 in lower part (K values)
HFflip = np.conjugate(np.flip(HF[0:K + 1])) # contains DC at [0]
HFextended[-K:] = HFflip[0:K] # omit DC at [K]
# K + K = N values, because N is even and K = N // 2
else:
# Copy DC and K positive frequencies in lower part (K + 1 values)
HFextended[0:K + 1] = HF[0:K + 1] # starts with DC at [0]
# Create the upper part of the spectrum from the K positive
# frequencies in the lower part (K values)
HFflip = np.conjugate(np.flip(HF[0:K + 1])) # contains DC at [0]
HFextended[-K:] = HFflip[0:K] # omit DC at [K]
# K + 1 + K = N values, because N is odd and K = N // 2
hInterpolated = np.fft.ifft(HFextended)
if np.allclose(hInterpolated.imag, np.zeros(Ncoefs), rtol=c_rtol, atol=c_atol):
if verbosity:
print('hInterpolated.imag ~= 0')
else:
print('WARNING: hInterpolated.imag != 0 (max(abs) = %e)' % np.max(np.abs(hInterpolated.imag)))
return hInterpolated.real
###############################################################################
# Discrete Time Fourier Transform (DTFT)
###############################################################################
def dtft(coefs, Ndtft=None, zeroCenter=True, fftShift=True):
"""Calculate DTFT of filter impulse response or window.
Use DFT with Ndtft points, to have frequency resolution of 2 pi / Ndtft.
Define h by extending coefs with Ndtft - Ncoefs zeros, where Ncoefs =
len(coefs). This DFT approaches the DTFT using bandlimited interpolation,
similar to INTERP() which interpolates by factor L = Ndtft / Ncoefs [JOS1].
Input:
. coefs: filter impulse response or window coefficients
. Ndtft: number of points in DFT to calculate DTFT
. zeroCenter: when True zero center h to have even function that aligns
with cos() in DTFT, for zero-phase argument (+-1 --> 0, +-pi). Else
apply h as causal function.
. fftShift: when True fft shift to have -0.5 to +0.5 frequency axis,
else use 0 to 1.0.
Return:
. h: zero padded coefs
. fn: normalized frequency axis for HF, fs = 1
. HF: dtft(h), the frequency transfer function of h
"""
c_interpolate = 10
Ncoefs = len(coefs)
if Ndtft is None:
Ndtft = ceil_pow2(Ncoefs * c_interpolate)
# Time series, causal with coefs at left in h
h = np.concatenate((coefs, np.zeros([Ndtft - Ncoefs])))
if zeroCenter:
# Zero center h to try to make it even
h = np.roll(h, -(Ncoefs // 2))
# DFT
HF = np.fft.fft(h)
# Normalized frequency axis, fs = 1, ws = 2 pi
fn = np.arange(0, 1, 1 / Ndtft) # fn = 0,pos, neg
if fftShift:
# FFT shift to center HF, fn = neg, 0,pos
fn = fn - 0.5
HF = np.roll(HF, Ndtft // 2)
return h, fn, HF
###############################################################################
# Estimate f for certain gain or estimate gain for certain frequency
###############################################################################
def estimate_gain_at_frequency(f, HF, freq):
"""Find gain = abs(HF) at frequency in f that is closest to freq.
Input:
. f: frequency axis as f = fn * fs for fn from dtft() with fftShift=True
. HF: spectrum as from dtft() with fftShift=True
. freq : frequency to look for in f
Return:
. fIndex: index of frequency in f that is closest to freq
. fValue: frequency at fIndex
. fGain : gain = abs(HF) at fIndex
"""
fDiff = np.abs(f - freq)
fIndex = fDiff.argmin()
fValue = f[fIndex]
fGain = np.abs(HF[fIndex])
return fIndex, fValue, fGain
def estimate_frequency_at_gain(f, HF, gain):
"""Find positive frequency in f that has gain = abs(HF) closest to gain.
Look only in positive frequencies and from highest frequency to lowest
frequency to avoid band pass ripple gain variation in LPF.
Input:
. f: frequency axis as f = fn * fs for fn from dtft() with fftShift=True
. HF: spectrum as from dtft() with fftShift=True
. gain : gain to look for in HF
Return:
. fIndex: index of frequency in f that has abs(HF) closest to gain
. fValue: frequency at fIndex
. fGain : gain = abs(HF) at fIndex
"""
# Look only in positive frequencies
pos = len(f[f <= 0])
HFpos = HF[pos:]
HFgain = np.abs(HFpos)
HFdiff = np.abs(HFgain - gain)
# Flip to avoid detections in LPF band pass
HFdiff = np.flipud(HFdiff)
fIndex = pos + len(HFpos) - 1 - HFdiff.argmin()
fValue = f[fIndex]
fGain = np.abs(HF[fIndex])
return fIndex, fValue, fGain
###############################################################################
# Radix FFT
###############################################################################
# def radix_fft():
# """Calculate DFT using smaller DFTs
#
# Use DFTs with radices equal to the greatest common dividers (gcd) of the
# input DFT size.
#
# https://stackoverflow.com/questions/9940192/how-to-calculate-a-large-size-fft-using-smaller-sized-ffts
# """
#! /usr/bin/env python3
###############################################################################
#
# Copyright 2024
# ASTRON (Netherlands Institute for Radio Astronomy) <http://www.astron.nl/>
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###############################################################################
# Author: Eric Kooistra
# Purpose: Hilbert transform functions for DSP
# Description:
#
# References:
# [1] dsp_study_erko.txt
import numpy as np
from .utilities import is_even
###############################################################################
# Hilbert transform filter
###############################################################################
def hilbert_response(Ntaps):
"""Calculate impulse response of Hilbert filter truncated to Ntaps
coefficients.
h(t) = 1 / (pi t) ( 1 - cos(ws t / 2), l'Hoptial's rule: h(0) = 0
For n = ...-2, -1, 0, 1, 2, ..., and Ts = 1, and Ntaps is
. odd: t = n Ts
ht[n] = 1 / (pi n)) ( 1 - cos(pi n))
= 0, when n is even
= 2 / (pi n), when n is odd
. even: t = (n + 0.5) Ts
ht(n) = 1 / (pi m)) ( 1 - cos(pi m)), with m = n + 0.5
"""
Npos = Ntaps // 2
if is_even(Ntaps):
ht = np.zeros(Npos)
for n in range(0, Npos):
m = n + 0.5
ht[n] = 1 / (np.pi * m) * (1 - np.cos(np.pi * m))
ht = np.concatenate((np.flip(-ht), ht))
else:
Npos += 1
ht = np.zeros(Npos)
for n in range(1, Npos, 2):
ht[n] = 2 / (np.pi * n)
ht = np.concatenate((np.flip(-ht[1:]), ht))
return ht
def hilbert_delay(Ntaps):
"""Delay impulse by (Ntaps - 1) / 2 to align with hilbert_response(Ntaps).
Analytic signal htComplex = htReal + 1j * htImag where:
. htReal = hilbert_delay(Ntaps)
. htImag = hilbert_response(Ntaps)
Only support integer delay D = (Ntaps - 1) / 2, so Ntaps is odd, then return
htReal, else return None.
"""
if is_even(Ntaps):
return None
D = (Ntaps - 1) // 2
htReal = np.zeros(Ntaps)
htReal[D] = 1.0
return htReal
#! /usr/bin/env python3
###############################################################################
#
# Copyright 2024
# ASTRON (Netherlands Institute for Radio Astronomy) <http://www.astron.nl/>
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###############################################################################
# Author: Eric Kooistra
# Purpose: IIR filter design functions for DSP
# Description:
#
# References:
# [1] dsp_study_erko.txt
# [2] https://webaudio.github.io/Audio-EQ-Cookbook/Audio-EQ-Cookbook.txt
import numpy as np
###############################################################################
# IIR Filter design
###############################################################################
def iir_biquad_butter_coefficients(f0, BW, band='bandstop', fs=1):
"""Calculate b0, b1, b2, a0, a1, a2 for biquad IIR filter section.
Bilinear transform of Butterworth filters, from [2].
For single biquad bandpass alternatively use signal.iirpeak()
For single biquad bandstop alternatively use signal.iirnotch()
"""
w0 = 2 * np.pi * f0 / fs
Q = f0 / BW
si = np.sin(w0)
cs = np.cos(w0)
alpha = np.sin(w0) / (2 * Q)
a1 = -2 * cs
if band == 'lowpass':
b0 = 1 - cs
b = [b0 / 2, b0, b0 / 2]
elif band == 'highpass':
b0 = 1 + cs
b = [b0 / 2, -b0, b0 / 2]
elif band == 'bandstop':
b = [1, a1, 1]
elif band == 'bandpass':
b = [si / 2, 0, -si / 2]
a = [1 + alpha, a1, 1 - alpha]
return b, a
This diff is collapsed.
#! /usr/bin/env python3
###############################################################################
#
# Copyright 2024
# ASTRON (Netherlands Institute for Radio Astronomy) <http://www.astron.nl/>
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###############################################################################
# Author: Eric Kooistra
# Purpose: Plotting functions for DSP
# Description:
#
# References:
# [1] dsp_study_erko.txt
# [2] https://github.com/chipmuenk/dsp/blob/main/notebooks/02_LTF/LTF-IIR-allgemein.ipynb
import numpy as np
from scipy import signal
import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1 import make_axes_locatable
import dsp_fpga_lib # from [2]
from .utilities import pow_db
def plot_time_response(h, title='', color='r', markers=False):
"""Plot time response (= impulse response, window, FIR filter coefficients).
Input:
. h: time response
. title: plot title
. color: curve color format character
. markers: when True plot time sample markers in curve
"""
if markers:
plt.plot(h, color + '-', h, color + 'o')
else:
plt.plot(h, color + '-')
if title:
plt.title(title)
else:
plt.title('Time response')
plt.ylabel('Voltage')
plt.xlabel('Sample')
plt.grid(True)
def plot_iir_filter_analysis(b, a, fs=1.0, whole=False, Ntime=100, step=False, log=False, show=[]):
"""Plot and print iir filter analysis results.
Input:
. b, a: IIR filter coefficients in same format as for scipy.signal.freqz
. fs: sample frequency
. whole: False to have only positive frequencies, so 0 to fs / 2 in
spectrum plots, True to have 0 to fs.
. Ntime: number of timesamples for impulse response
. step: False for impulse response, True for step response
. show[]: which plots to show from: 'zplane', 'power spectrum',
'phase spectrum', 'time response'
Return: z, p, k
"""
# Plot poles / zeros diagram in z-plane
if 'zplane' in show:
fig1, ax1 = plt.subplots(1)
ax1.set_title('Zeros (o) and poles (x) in z-plane')
z, p, k = dsp_fpga_lib.zplane(b, a, plt_ax=ax1) # uses np.roots(a), np.roots(b)
else:
z, p, k = dsp_fpga_lib.zplane(b, a) # no plot, only calculate z, p, k
if log:
print('plot_iir_filter_analysis()')
print('Zeros, poles and gain from b, a coefficients:')
if len(z) > 0:
print('. zeros:')
for zero in z:
print(' z = %s' % str(zero))
if len(p) > 0:
print('. poles:')
for pole in p:
print(' p = %s' % str(pole))
print('. gain: k = %.3f' % k)
# Derive b, a coefficients back from z, p, k
print('Coefficients back from z, p, k:')
print(' b = %s' % str(np.poly(z)))
print(' a = %s' % str(np.poly(p) / k))
print('')
# Plot transfer function H(f), is H(z) for z = exp(j w), so along the unit circle
# . 0 Hz at 1 + 0j, fs / 4 at 0 + 1j, fNyquist = fs / 2 at -1 + 0j
# . use whole=False to have only positive frequencies, so 0 to fs / 2
# . use Nfreq frequency points
if 'power spectrum' in show or 'phase spectrum' in show:
Nfreq = 1024
[f, HF] = signal.freqz(b, a, Nfreq, whole=whole, fs=fs)
if 'power spectrum' in show:
fig2, ax2 = plt.subplots(1)
ax2.plot(f, pow_db(HF))
ax2.set_title('Power spectrum')
ax2.set_xlabel('frequency [fs = %f]' % fs)
ax2.set_ylabel('HF power [dB]')
if 'phase spectrum' in show:
fig3, ax3 = plt.subplots(1)
ax3.plot(f, np.unwrap(np.angle(HF)))
ax3.set_title('Phase spectrum')
ax3.set_xlabel('frequency [fs = %f]' % fs)
ax3.set_ylabel('HF phase [rad]')
# Plot impulse response
if 'time response' in show:
Ts = 1 / fs
fig4, ax4 = plt.subplots(1)
# impz uses signal.lfilter(), and cumsum(h) for step response
[h, t] = dsp_fpga_lib.impz(b, a, FS=fs, N=Ntime, step=step)
(ml, sl, bl) = ax4.stem(t, h, linefmt='b-', markerfmt='ro', basefmt='k')
if step:
ax4.set_title('Step response')
else:
ax4.set_title('Impulse response')
ax4.set_xlabel('time [Ts = %f]' % Ts)
ax4.set_ylabel('h[n]')
return z, p, k
def plot_spectra(fn, HF, fs=1.0, fLim=None, dbLim=None, aLim=None):
"""Plot spectra for power, magnitude, phase, real, imag
Input:
. fn: normalized frequency axis for HF (fs = 1)
. HF: spectrum, e.g. frequency transfer function HF = DTFT(h)
. fs: sample frequency in Hz
"""
Hmag = np.abs(HF)
# Hphs = np.unwrap(np.angle(HF))
Hphs = np.angle(HF)
Hpow_dB = pow_db(HF) # power response
f = fn * fs # scale fn by fs
flabel = 'Frequency [fs = %f]' % fs
plt.figure(1)
plt.plot(f, Hpow_dB)
plt.title('Power spectrum')
plt.ylabel('Power [dB]')
plt.xlabel(flabel)
if fLim:
plt.xlim(fLim)
if dbLim:
plt.ylim(dbLim)
plt.grid(True)
plt.figure(2)
plt.plot(f, HF.real, 'r')
plt.plot(f, HF.imag, 'g')
plt.title('Complex voltage spectrum')
plt.ylabel('Voltage')
plt.xlabel(flabel)
plt.legend(['real', 'imag'])
if fLim:
plt.xlim(fLim)
plt.grid(True)
plt.figure(3)
plt.plot(f, Hmag)
plt.title('Magnitude spectrum') # = amplitude
plt.ylabel('Voltage')
plt.xlabel(flabel)
if fLim:
plt.xlim(fLim)
plt.grid(True)
plt.figure(4)
plt.plot(f, Hphs)
plt.title('Phase spectrum (note -1: pi = -pi)')
plt.ylabel('Phase [rad]')
plt.xlabel(flabel)
if fLim:
plt.xlim(fLim)
if aLim:
plt.ylim(aLim)
plt.grid(True)
def plot_phase_spectrum(fn, HF, fmt='r', fs=1.0, fLim=None, aLim=None):
"""Plot phase spectrum
Use fLim and aLim to zoom in, to see slope in pass band. Note that -pi =
pi.
Input:
. fn: normalized frequency axis for HF (fs = 1)
. HF: spectrum, e.g. frequency transfer function HF = DTFT(h)
. fmt: curve format string
. fs: sample frequency in Hz
"""
f = fn * fs # scale fn by fs
flabel = 'Frequency [fs = %f]' % fs
# Hphs = np.unwrap(np.angle(HF))
Hphs = np.angle(HF)
plt.plot(f, Hphs, fmt)
plt.title('Phase spectrum')
plt.ylabel('Pase [rad]')
plt.xlabel(flabel)
if fLim:
plt.xlim(fLim)
if aLim:
plt.ylim(aLim)
plt.grid(True)
def plot_power_spectrum(fn, HF, fmt='r', fs=1.0, fLim=None, dbLim=None):
"""Plot power spectrum
Input:
. fn: normalized frequency axis for HF (fs = 1)
. HF: spectrum, e.g. frequency transfer function HF = DTFT(h)
. fmt: curve format string
. fs: sample frequency in Hz
"""
f = fn * fs # scale fn by fs
flabel = 'Frequency [fs = %f]' % fs
plt.plot(f, pow_db(HF), fmt)
plt.title('Power spectrum')
plt.ylabel('Power [dB]')
plt.xlabel(flabel)
if fLim:
plt.xlim(fLim)
if dbLim:
plt.ylim(dbLim)
plt.grid(True)
def plot_magnitude_spectrum(fn, HF, fmt='r', fs=1.0, fLim=None, voltLim=None):
"""Plot magnitude (= voltage) spectrum
Input:
. fn: normalized frequency axis for HF (fs = 1)
. HF: spectrum, e.g. frequency transfer function HF = DTFT(h)
. fmt: curve format string
. fs: sample frequency in Hz
"""
f = fn * fs # scale fn by fs
flabel = 'Frequency [fs = %f]' % fs
plt.plot(f, np.abs(HF), fmt)
plt.title('Magnitude spectrum')
plt.ylabel('Voltage')
plt.xlabel(flabel)
if fLim:
plt.xlim(fLim)
if voltLim:
plt.ylim(voltLim)
plt.grid(True)
def plot_two_power_spectra(fn1, HF1, name1, fn2, HF2, name2, fs=1.0, fLim=None, dbLim=None, showRoll=False):
"""Plot two power spectra in same plot for comparison
Input:
. fn1,fn2: normalized frequency axis for HF1, HF2 (fs = 1)
. HF1, HF2: spectrum, e.g. frequency transfer function HF = DTFT(h)
. fs: sample frequency in Hz
"""
f1 = fn1 * fs # scale fn1 by fs
f2 = fn2 * fs # scale fn1 by fs
flabel = 'Frequency [fs = %f]' % fs
if showRoll:
plt.plot(f1, pow_db(HF1), 'r',
f1, np.roll(pow_db(HF1), len(f1) // 2), 'r--',
f2, pow_db(HF2), 'b',
f2, np.roll(pow_db(HF2), len(f2) // 2), 'b--')
plt.legend([name1, '', name2, ''])
else:
plt.plot(f1, pow_db(HF1), 'r',
f2, pow_db(HF2), 'b')
plt.legend([name1, name2])
plt.title('Power spectrum')
plt.ylabel('Power [dB]')
plt.xlabel(flabel)
if fLim:
plt.xlim(fLim)
if dbLim:
plt.ylim(dbLim)
plt.grid(True)
def plot_colorbar(plt):
# Work around using make_axes_locatable() to avoid:
# /tmp/ipykernel_9570/3561080291.py:12: MatplotlibDeprecationWarning: Auto-removal of grids by
# pcolor() and pcolormesh() is deprecated since 3.5 and will be removed two minor releases
# later; please call grid(False) first.
# Using plt.grid(False) is not sufficient to avoid the warning.
ax = plt.gca()
divider = make_axes_locatable(ax)
cax = divider.append_axes("right", size='5%', pad=0.1)
cax.grid(False)
plt.colorbar(cax=cax)
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment