Skip to content
Snippets Groups Projects

Resolve RTSD-265

Merged Eric Kooistra requested to merge RTSD-265 into master
1 file
+ 3
8
Compare changes
  • Side-by-side
  • Inline
%% Cell type:markdown id:c69a2eb8 tags:
# Try polyphase filterbank (PFB)
Author: Eric Kooistra, apr 2024
Purpose:
* Practise DSP [1].
* Try to reproduce LOFAR subband filterbank in Python instead of MATLAB [2]
* Investigate reconstruction using critically sampled analysis PFB and synthesis PFB
Result:
* Reconstruction with SNR = pow(input) / pow(input - reconctructed) > 18 dB does not seem feasible, for any firType. Increasing Ntaps also does not help.
* Stop band attenuation is important for loss due to aliasing from other than neighbour subband
* Half power gain is important for all pass transfer function across neighbouring subbands
* LOFAR subband filter achieves SNR = 14 dB with hpFactor = 0.96 to have fcutoff ~= sqrt(0.5)
References:
1. dsp_study_erko, summary of DSP books
2. https://git.astron.nl/rtsd/apertif_matlab/-/blob/master/matlab/one_pfb.m
%% Cell type:code id:02689e50 tags:
``` python
import numpy as np
import matplotlib.pyplot as plt
from scipy import signal
```
%% Cell type:code id:65235f50 tags:
``` python
# Auto reload module when it is changed
%load_ext autoreload
%autoreload 2
# Add rtdsp module path to $PYTHONPATH
import os
import sys
module_path = os.path.abspath(os.path.join('../'))
if module_path not in sys.path:
sys.path.insert(0, module_path)
# Import rtdsp
from rtdsp.utilities import pow_db
from rtdsp.firfilter import prototype_fir_low_pass_filter, design_fir_low_pass_filter, \
design_fir_low_pass_filter_adjust, \
raised_cosine_response, square_root_raised_cosine_response, \
filterbank_impulse_response, filterbank_frequency_response
from rtdsp.fourier import dtft, estimate_gain_at_frequency
from rtdsp.multirate import PolyPhaseFirFilterStructure
from rtdsp.plotting import plot_power_spectrum, plot_magnitude_spectrum, plot_colorbar, \
plot_time_response
```
%% Output
The autoreload extension is already loaded. To reload it, use:
%reload_ext autoreload
%% Cell type:code id:a39e99a2 tags:
``` python
atol = 1e-8
```
%% Cell type:code id:3436bc2a tags:
``` python
# Filterbank
Ntaps = 8 #8 # number of taps per poly phase FIR filter
Ndft = 16 #16 DFT size
Ncoefs = Ndft * Ntaps # number of coefficients in prototype LPF
# Select FFT or IFFT, end result is the same, but subband frequencies differ in sign
analysisFFT = True # analysis FFT --> synthesis IFFT as in LOFAR
# analysisFFT = False # analysis IFFT --> synthesis FFT as in [HARRIS 6, 7]
```
%% Cell type:code id:ca6b8c9d tags:
``` python
# Select LPF type
firType = 'rect'
firType = 'firwin'
#firType = 'firls'
#firType = 'srRemezAdjust'
#firType = 'srRemezCutoff'
#firType = 'srCos'
```
%% Cell type:code id:74eb2365 tags:
``` python
# Select WG signal
wgType = 'chirp'
wgType = 'noise'
```
%% Cell type:code id:55e0fe37 tags:
``` python
# Samples
fs = 1 # sample rate
ts = 1 / fs # sample period
```
%% Cell type:code id:20ae2f4a tags:
``` python
# Subbands
Nsub = Ndft // 2 # number of subbands in fs / 2
fsub = fs / Ndft # subband frequency
tsub = 1 / fsub # subband period
```
%% Cell type:markdown id:a949b9ff tags:
# 1 Waveform generator (WG)
%% Cell type:code id:e08b5ba2 tags:
``` python
if wgType == 'chirp':
# Chirp signal
chirpStartSub = 1 # start chirp subband
chirpNofSub = 10 # number of chirp subbands
#chirpNofSub = Nsub
NSimSub = chirpNofSub * 10 # number of subband periods to simulate
NSimSamples = NSimSub * Ndft
fstart = fsub * chirpStartSub
fend = fsub * (chirpStartSub + chirpNofSub)
tsim = NSimSub * tsub
t = np.arange(0, tsim, ts) # len(t) == NSimSamples
wgData = signal.chirp(t, f0=fstart, f1=fend, t1=tsim, method='linear')
print('Chirp:')
print('. chirpStartSub = %d' % chirpStartSub)
print('. chirpNofSub = %d' % chirpNofSub)
print('. NSimSub = %d' % NSimSub)
print('. NSimSamples = %d' % NSimSamples)
# Marker pulses at subband center frequencies
wgMarkers = np.zeros(NSimSamples)
nofStep = (fend - fstart) / fsub
tStep = tsim / nofStep
tMark = 0
while tMark < tsim:
tIndex = int(NSimSamples * tMark / tsim)
wgMarkers[tIndex] = 1
tMark += tStep
plt.plot(t, wgData, 'b')
plt.plot(t, wgMarkers, 'r')
plt.title('Linear chirp, f = ' + str(fstart) + ' to f = ' + str(fend))
plt.xlabel('t [ts = ' + str(ts) + ']')
plt.ylabel('voltage')
#plt.xlim(700, 900)
plt.grid(True)
```
%% Cell type:code id:bd4054d7 tags:
``` python
if wgType == 'noise':
NSimSub = Ntaps * 5 # number of subband periods to simulate
NSimSamples = NSimSub * Ndft
tsim = NSimSub * tsub
t = np.arange(0, tsim, ts) # len(t) == NSimSamples
# Broadband noise
rng = np.random.default_rng(seed=7)
mu = 0.0
sigma = 1.0
wgData = rng.normal(mu, sigma, NSimSamples)
# Marker pulses at subband periods
wgMarkers = np.zeros(NSimSamples)
tStep = tsub * 3
nofStep = int(tsim / tStep)
tIndex = 0
while tIndex < NSimSamples:
wgMarkers[tIndex] = 1
tIndex += Ndft
plt.plot(t, wgData, 'b')
plt.plot(t, wgMarkers, 'r')
plt.title('Broadband noise')
plt.xlabel('t [ts = ' + str(ts) + ']')
plt.ylabel('voltage')
#plt.xlim(0, 100)
plt.grid(True)
```
%% Output
%% Cell type:markdown id:acca4f19 tags:
# 2 Prototype FIR low pass filter
%% Cell type:code id:181d6c07 tags:
``` python
# Use rectangular prototype FIR filter, for transparant response of DFT filterbank
if firType == 'rect':
hPrototype = np.zeros(Ncoefs)
hPrototype[0 : Ndft] = 1 / Ndft
```
%% Cell type:code id:6108ff59 tags:
``` python
# Use LOFAR subband prototype FIR filter, to minimize aliasing
if firType == 'firls':
hpFactor = 0.90
transitionFactor = 0.4
hpFactor = 0.97
transitionFactor = 0.2
stopRippleFactor = 1 # 1000000
kaiserBeta = 0 # 1
hPrototype = prototype_fir_low_pass_filter('firls',
Ndft, Ntaps, Ncoefs, hpFactor, transitionFactor, stopRippleFactor, kaiserBeta, fs)
```
%% Cell type:code id:3ec78793 tags:
``` python
# Use windowed sync prototype FIR filter, is Portnoff window
if firType == 'firwin':
BWbin = 1 / Ndft # bandwidth of one bin
# . Use half power bandwidth factor to tune half power cutoff frequency of LPF, default 1.0
hpFactor = 1.11
hpFactor = 1.0
BWpass = hpFactor * BWbin
fpass = BWpass / 2 # bin at DC: -fpass to +fpass
fcutoff = fpass
hPrototype = signal.firwin(Ncoefs, fcutoff, window='hann', fs=fs)
```
%% Cell type:code id:da37c5b9 tags:
``` python
# Use square root gain at fcutoff prototype FIR filter, to minimize aliasing
if firType == 'srRemezAdjust':
cutoffBeta = 0.2
fcutoff = fsub / 2 # center frequency of transition band
fpass = (1 - cutoffBeta) * fcutoff # pass band frequency
fstop = (1 + cutoffBeta) * fcutoff # stop band frequency
# Gain at fcutoff center frequency of the transition band
cutoffGain = np.sqrt(0.5)
#cutoffGain = 0.5
# weight pass band ripple versus stop band ripple
rippleWeights = [1, 1]
hPrototype = design_fir_low_pass_filter_adjust('remez',
Ncoefs, fpass, fstop, fcutoff, cutoffGain, rippleWeights, fs=fs, resolution=1e-4)
```
%% Cell type:code id:4f11bd88 tags:
``` python
# Use square root gain at fcutoff prototype FIR filter, to minimize aliasing
if firType == 'srRemezCutoff':
cutoffBeta = 0.2
fcutoff = fsub / 2 # center frequency of transition band
fpass = (1 - cutoffBeta) * fcutoff # pass band frequency
fstop = (1 + cutoffBeta) * fcutoff # stop band frequency
# Gain at fcutoff center frequency of the transition band
cutoffGain = np.sqrt(0.5)
# weight pass band ripple versus stop band ripple, and of zero width interval in the transition band
rippleWeights = [1, 1, 1]
hPrototype = design_fir_low_pass_filter('remez',
Ncoefs, fpass, fstop, fcutoff, cutoffGain, rippleWeights, fs=fs)
```
%% Cell type:code id:a3b429a8 tags:
``` python
if firType == 'srCos':
rollOffBeta = 0.2
Nsps = Ndft
hPrototype = square_root_raised_cosine_response(Ncoefs, Nsps, rollOffBeta)
```
%% Cell type:code id:8e867248 tags:
``` python
# Plot impulse response
plt.figure(1)
plt.plot(hPrototype)
plt.title('Impulse response')
#plt.xlim((50, 60))
plt.grid(True)
# Plot transfer function
plt.figure(2)
fLim = (-2, 2)
#fLim = None
dbLim = (-140, 5)
h, f, HF = dtft(hPrototype)
plot_power_spectrum(f, HF, 'g', fs * Ndft, fLim, dbLim)
plt.figure(3)
voltLim = None
plot_magnitude_spectrum(f, HF, 'g', fs * Ndft, fLim, voltLim)
# Log gain at fsub / 2
fIndex, fValue, fGain = estimate_gain_at_frequency(f, HF, fsub / 2)
print('estimate_gain_at_frequency f = 0.5 [fsub]')
print('. fIndex = %d' % fIndex)
print('. fValue = %e [fsub]' % (fValue / fsub))
print('. fGain**2 = %e' % fGain**2)
```
%% Output
estimate_gain_at_frequency f = 0.5 [fsub]
. fIndex = 1088
. fValue = 5.000000e-01 [fsub]
. fGain**2 = 2.493174e-01
%% Cell type:code id:55808f1c tags:
``` python
# Filterbank aggregate frequency response
h, f, HFprototype = dtft(hPrototype)
HFbank = filterbank_frequency_response(HFprototype, Ndft)
fLim = None
fLim = (0, 2)
dbLim = None
#dbLim = (-20, 5)
#dbLim = (-0.2, 0.2)
voltLim = None
voltLim = (0, 2)
#voltLim = (0.9, 1.1)
plt.figure(1)
plot_power_spectrum(f, HFbank, 'g', fs * Ndft, fLim, dbLim)
plt.figure(2)
plot_magnitude_spectrum(f, HFbank, 'g', fs * Ndft, fLim, voltLim)
```
%% Output
%% Cell type:code id:6eb3b494 tags:
``` python
# Filterbank aggregate time response
hbank = filterbank_impulse_response(hPrototype, Ndft)
plot_time_response(hbank, title='Filterbank impulse response', color='r', markers=False)
```
%% Output
%% Cell type:code id:eee415a1 tags:
``` python
# Polyphase FIR filter, can use same for analysis PFB (down) and synthesis PFB (up)
pfs = PolyPhaseFirFilterStructure(Ndft, hPrototype)
```
%% Cell type:markdown id:d3628f9f tags:
# 3 Analysis filterbank
%% Cell type:code id:9468907a tags:
``` python
pfs.reset_poly_delays()
pfsDownData = np.zeros((NSimSub, Ndft))
dftData = np.zeros((NSimSub, Ndft), dtype=np.complex_)
# pfir + dft
for b in range(NSimSub):
# Input signal
inData = wgData[b * Ndft : (b + 1) * Ndft]
# Filter to downsample time signal
pfsDownData[b] = pfs.filter_block(inData)
pfsDownData[b] = pfs.filter_block(inData, flipped=False)
# Frequency domain data
# . The order of the pfs polyphases 0 : Ndft-1 fits the input order for FFT (and IFFT)
if analysisFFT:
dftData[b] = np.fft.fft(pfsDownData[b]) # LOFAR
else:
dftData[b] = Ndft * np.fft.ifft(pfsDownData[b]) # [HARRIS Fig 6.14, 6.21]
```
%% Cell type:code id:4d046175 tags:
``` python
# Plot subband spectra
spectra = np.zeros((NSimSub, Nsub))
for b in range(NSimSub):
subbands = dftData[b][0:Nsub]
spectra[b] = pow_db(np.abs(subbands))
plt.figure(1)
plt.imshow(spectra, aspect='auto')
plt.title('Power spectra in dB')
plt.xlabel('f [fsub = ' + str(fsub) + ']')
plt.ylabel('t [tsub = ' + str(tsub) + ']')
xstart = 0
xend = Nsub
if wgType == 'chirp':
xend = min(chirpStartSub + chirpNofSub + 100, Nsub - 1)
plt.xlim((xstart, xend))
plot_colorbar(plt)
plt.figure(2)
fn = np.arange(0, Nsub) / Nsub / 2
Nb = 2 # number of spectra to plot
if wgType == 'chirp':
if Nb > chirpNofSub:
Nb = chirpNofSub
for b in range(0, NSimSub, int(NSimSub / Nb)):
subbands = dftData[b][0:Nsub]
plot_power_spectrum(fn, subbands, fs=Nsub * 2)
xstart = 0
if wgType == 'chirp':
xend = min(chirpStartSub + chirpNofSub + 10, Nsub -1)
plt.xlim((xstart, xend))
```
%% Output
%% Cell type:markdown id:f39d83a6 tags:
# 4 Synthesis filterbank
%% Cell type:code id:c8d0560c tags:
``` python
pfs.reset_poly_delays()
pfsUpData = np.zeros((NSimSub, Ndft))
reconstructedData = np.zeros(NSimSub * Ndft)
# idft + pfir
for b in range(NSimSub):
# Frequency domain data
freqData = dftData[b]
# Time domain data: timeData = pfsDownData
if analysisFFT:
timeData = Ndft * np.fft.ifft(freqData) # LOFAR
else:
timeData = np.fft.fft(freqData) # [HARRIS Fig 7.16]
timeData = np.fft.fft(freqData) # [HARRIS Fig 7.16 has ifft]
# Check that domain data is real
if np.max(np.abs(timeData.imag)) > atol:
plt.plot(timeData.imag)
break;
# Filter to upsample time domain data
pfsUpData[b] = Ndft * pfs.filter_block(timeData.real)
pfsUpData[b] = Ndft * pfs.filter_block(timeData.real, flipped=False)
# Reconstructed time signal is the pfs output from the Ndft polyphases.
# . For upsampling the commutator selects the pfs polyphases 0 : Ndft-1,
# which fits the data output order in time
reconstructedData[b * Ndft : (b + 1) * Ndft] = pfsUpData[b]
```
%% Cell type:code id:e01f671d tags:
``` python
# Group delay of output signal
if firType == 'rect':
groupDelay = 0 # because only first tap of Ntaps is none zero
else:
groupDelay = (Ncoefs - Ndft) / 2
groupDelay *= 2 # for hDown and hUp
print('groupDelay = ' + str(groupDelay) + ' samples')
intGroupDelay = int(groupDelay)
```
%% Output
groupDelay = 112.0 samples
%% Cell type:code id:c0573913 tags:
``` python
# Compare reconstructed output with input
tt = t[0 : NSimSamples - intGroupDelay]
inpData = wgData[0 : NSimSamples - intGroupDelay]
inpMarkers = wgMarkers[0 : NSimSamples - intGroupDelay]
outData = reconstructedData[intGroupDelay :]
diffData = inpData - outData
diffDataMax = np.max(np.abs(diffData))
SNRdb = pow_db(np.std(wgData) / np.std(diffData))
# Expected SNR = 11.96 dB for Ntap = 8, Ndft = 16, 'firwin', hpFactor = 1.0, 'noise'
print('SNRdb = %.2f [dB]' % SNRdb)
if SNRdb < 100:
plt.figure(1, figsize=(11, 8))
plt.title('Reconstructed signal for %s' % wgType)
plt.plot(tt, inpData, 'r')
plt.plot(tt, outData, 'g')
plt.plot(tt, inpMarkers, 'b--')
# use loc='lower center' to avoid time UserWarning on default loc='best' in case of large len(t)
plt.legend(['wgData', 'reconstructedData', 'wgMarkers'], loc='lower center')
x = tStep * 1
dx = tStep * 0.1
dStep = tStep * 1
plt.xlim((x - dx, x + dx + dStep))
plt.xlabel('t [ts = ' + str(ts) + ']')
plt.ylabel('voltage')
plt.grid(True)
plt.figure(2, figsize=(11, 8))
plt.title('Difference signal for %s' % wgType)
plt.plot(tt, diffData, 'm')
plt.plot(tt, inpMarkers, 'b--')
# use loc='lower center' to avoid time UserWarning on default loc='best' in case of large len(t)
plt.legend(['diffData', 'wgMarkers'], loc='lower center')
x = tStep * 1
dx = tStep * 0.1
dStep = tStep * 1
#plt.xlim((x - dx, x + dx + dStep))
plt.xlabel('t [ts = ' + str(ts) + ']')
plt.ylabel('voltage')
plt.grid(True)
```
%% Output
SNRdb = 11.96 [dB]
%% Cell type:code id:8bd92167 tags:
``` python
```
Loading