Skip to content
Snippets Groups Projects
Commit f79503a0 authored by Eric Kooistra's avatar Eric Kooistra
Browse files

Merge branch 'master' into L2SDP-885b

parents 221436dd 656ea6f0
Branches
No related tags found
1 merge request!412Resolve L2SDP-885 "B"
......@@ -175,13 +175,20 @@ b) z-transform [LYONS 6.3, MATLAB]
- Causal systems are 0 for n < 0, so sum from n = 0 yields one sided z-transform
- For r = 1 the z-transform yields the DTFT, so DTFT evaluates the
z-transform on the unit circle.
- Properties [PROAKIS 3.3]:
. shift property: x[n - k] <--> X(z) z^-k
. time reversal: x[-n] <--> X(z^-1]
. convolution: x[n] * y[n] <--> X(z) Y(z)
. conjugation: x*[n] <--> X*(z*)
. real part: Re{x[n]} <--> 1/2 [X(z) + X*(z*)]
. imag part: Im{x[n]} <--> 1/2 [X(z) - X*(z*)]
- Examples:
. wire: H(z) = 1
. delay: H(z) = z^-1, so x[n - 1] <--> X(z) z^-1 (shift property)
. integrator: d[n] pulse at n = 0 --> u[n] step,
Y(z) = 1 + z^-1 + z^-2 + ..., multiply by (z - 1) / (z - 1)
= z / (z - 1) = 1 / (1 - z^-1)
. difference equation
. difference equation <--> rational z-transform
y[n] + 0.5 y[n-1] = u[n] --> Y(z) + 0.5 z^-1 Y(z) = U(z)
H(z) = Y(z) / U(Z) = 1 / (1 + 0.5 z^-1)
......
This diff is collapsed.
......@@ -68,7 +68,8 @@ end
% initial filter length
M1=N*L/Q;
% pass bandwidth
% pass bandwidth, w_pb, is the normalized cutoff frequency in the range between 0 and 1 (where 1 corresponds to the
% Nyquist frequency (as defined in fircls1)
w_pb = Q * BWchan;
% compute initial filter
......
......@@ -30,7 +30,7 @@
import numpy as np
from scipy import signal
from .utilities import ceil_div, is_even, is_symmetrical
from .fourier import fourier_interpolate
from .fourier import fourier_interpolate, dtft, estimate_gain_at_frequency
###############################################################################
......@@ -161,7 +161,7 @@ def ideal_low_pass_filter(Npoints, Npass, bandEdgeGain=1.0):
def design_fir_low_pass_filter(method,
Ncoefs, fpass, fstop, fcutoff=0, cutoffGain=0.5, rippleWeights=[1, 1],
kaiserBeta=0, fs=1.0,
cutoffBW=0):
cutoffBW=0, verbosity=1):
"""Derive FIR coefficients for low pass filter
Use method 'firls' or 'remez', fs = 1.0
......@@ -196,6 +196,7 @@ def design_fir_low_pass_filter(method,
- kaiserBeta: When kaiserBeta > 0, then additionally apply a Kaiser window on FIR
coefficients
- cutoffBW: workaround bandwidth at fcutoff for firls, because firls does not support zero BW at fcutoff.
- verbosity: when > 0 print() status, else no print()
Return:
- h: FIR coefficients for requested Ncoefs
"""
......@@ -205,23 +206,22 @@ def design_fir_low_pass_filter(method,
# Initial FIR filter length N
# . Use interpolation of factor Q shorter filter to ensure the FIR filter design converges
# and to speed up calculation. N >> NFirlsMax = 1024 is not feasible.
# and to speed up calculation. N >> NcoefsFirlsMax = 1024 is not feasible.
# . The passband ripple and stopband attenuation depend on the transition bandwidth
# and the rippleWeights. Choose transition band width ~< fpass, to ensure the FIR filter
# design converges and improve the passband ripple and stopband attenuation. A too large
# transition bandwidth also gives the design too much freedom and causes artifacts in
# the transition band.
# . scipy firls() defines fpass relative to fs, so can use fpass as cutoff frequency.
if method == 'firls':
NFirlsMax = 1024
Q = ceil_div(Ncoefs, NFirlsMax)
NcoefsFirlsMax = 1024
Q = ceil_div(Ncoefs, NcoefsFirlsMax)
N = Ncoefs // Q
# If necessary -1 to make odd, because firls only supports odd number of FIR coefficients
if is_even(N):
N = N - 1
if method == 'remez':
NRemezMax = 512
Q = ceil_div(Ncoefs, NRemezMax)
NcoefsRemezMax = 512
Q = ceil_div(Ncoefs, NcoefsRemezMax)
N = Ncoefs // Q
# Low pass transition band
......@@ -267,9 +267,10 @@ def design_fir_low_pass_filter(method,
# Use Fourier interpolation to create final FIR filter coefs when
# Q > 1 or when N != Ncoefs
HFfir = np.fft.fft(hFir)
h = fourier_interpolate(HFfir, Ncoefs)
h = fourier_interpolate(HFfir, Ncoefs, verbosity=0)
print('> design_fir_low_pass_filter()')
if verbosity:
print('> design_fir_low_pass_filter():')
print('. Method = %s' % method)
print('. Q = %f' % Q)
print('. fpass = %f' % fpass)
......@@ -287,6 +288,95 @@ def design_fir_low_pass_filter(method,
return h
def design_fir_low_pass_filter_adjust(method,
Ncoefs, fpass, fstop, fcutoff, cutoffGain=0.5, rippleWeights=[1, 1],
kaiserBeta=0, fs=1.0, resolution=1e-4, verbosity=1):
"""Derive FIR coefficients for low pass filter for exact fcutoff
Uses design_fir_low_pass_filter() but achieves cutoffGain at fcutoff by adjusting fpass or fstop, and
fpass < fcutoff < fstop.
"""
# Determine whether to adjust fpass or fstop
nofIterations = 1
nofIterationsMax = 1000
h = design_fir_low_pass_filter(method, Ncoefs, fpass, fstop, rippleWeights=rippleWeights,
kaiserBeta=kaiserBeta, fs=fs, verbosity=0)
_, fn, HF = dtft(h)
f = fn * fs
fIndex, fValue, fGain = estimate_gain_at_frequency(f, HF, fcutoff)
if fGain < cutoffGain:
adjustBand = 0 # increase fpass
adjustBW = fcutoff - fpass
else:
adjustBand = 1 # decrease fstop
adjustBW = fstop - fcutoff
# Iterate to achieve cutoffGain at fcutoff
fRadix = 16
fStep = adjustBW / fRadix
gainResolution = cutoffGain * resolution
FP = fpass
FS = fstop
result = False
while True:
# Calculate iteration
nofIterations += 1
h = design_fir_low_pass_filter(method, Ncoefs, FP, FS, rippleWeights=rippleWeights,
kaiserBeta=kaiserBeta, fs=fs, verbosity=0)
_, fn, HF = dtft(h)
fIndex, fValue, fGain = estimate_gain_at_frequency(fn, HF, fcutoff)
# print('DEBUG: FP, FS, fIndex, fValue, fGain = %s' % str((FP, FS, fIndex, fValue, fGain)))
if np.abs(fGain - cutoffGain) < gainResolution:
result = True
break
if nofIterations > nofIterationsMax:
print('ERROR: Too many iterations.')
break
# Prepare next iteration
if adjustBand == 0:
if fGain < cutoffGain:
# Keep on increasing FP
FP += fStep
# print('DEBUG: Increase FP')
# If necessary increase FS to keep fcutoff, this can occur when rippleWeights for stop band >
# rippleWeights for pass band
if FP >= fcutoff:
# print('DEBUG: Increment FS')
FP = fpass
FS += fStep
else:
# FP too high, one fStep back and increment FP with reduced fStep
FP -= fStep
fStep /= fRadix
FP += fStep
else:
if fGain > cutoffGain:
# Keep on decreasing FS
FS -= fStep
# print('DEBUG: Decrease FS')
else:
# FS too low, one fStep back and decrement FS with reduced fStep
FS += fStep
fStep /= fRadix
FS -= fStep
# Return result
if result:
if verbosity:
print('> design_fir_low_pass_filter_adjust():')
print('. nofIterations = %d' % nofIterations)
print('. fpass, fcutoff, stop = %s' % str((fpass, fcutoff, fstop)))
print('. FP, fcutoff, FS = %s' % str((FP, fcutoff, FS)))
print('. fcutoff / fpass = %f' % (fcutoff / fpass))
print('. fstop / fcutoff = %f' % (fstop / fcutoff))
print('. FP / fpass = %f' % (FP / fpass))
print('. FS / fstop = %f' % (FS / fstop))
return h
else:
print('ERROR: Return None.')
return None
def prototype_fir_low_pass_filter(method='firls',
Npoints=1024, Ntaps=16, Ncoefs=1024*16,
hpFactor=0.9, transitionFactor=0.4, stopRippleFactor=1000000, kaiserBeta=0, fs=1.0):
......@@ -344,12 +434,22 @@ def prototype_fir_low_pass_filter(method='firls',
###############################################################################
# Filterbank transfer function for prototype LPF
# Filterbank
###############################################################################
def derive_filterbank_transfer(HFprototype, Npoints):
Lprototype = len(HFprototype)
Lbin = Lprototype / Npoints
def filterbank_aggregate_transfer(HFprototype, Npoints):
"""Aggregate frequency reponse of filterbank
Sum of bandpass responses for all Npoints frequency bins in a filterbank.
N-1
HFbank = sum HF_k(w), with HF_k(w) = H0(w - 2pi/N k)
k=0
and H0(w) is the prototype LPF [PROAKIS Eq. 10.9.2].
"""
Lprototype = len(HFprototype) # corresponds to 2 pi
Lbin = Lprototype / Npoints # corresponds to 2 pi / N
HFbank = np.zeros(Lprototype, dtype='cfloat')
for bi in range(Npoints):
HFbank += np.roll(HFprototype, bi * Lbin)
......
......@@ -34,7 +34,7 @@ from .utilities import c_rtol, c_atol, ceil_pow2, is_even
# Time domain interpolation using DFT
###############################################################################
def fourier_interpolate(HFfilter, Ncoefs):
def fourier_interpolate(HFfilter, Ncoefs, verbosity=1):
"""Use Fourier interpolation to create final FIR filter coefs.
HF contains filter transfer function for N points, in order 0 to fs. The
......@@ -82,6 +82,7 @@ def fourier_interpolate(HFfilter, Ncoefs):
# K + 1 + K = N values, because N is odd and K = N // 2
hInterpolated = np.fft.ifft(HFextended)
if np.allclose(hInterpolated.imag, np.zeros(Ncoefs), rtol=c_rtol, atol=c_atol):
if verbosity:
print('hInterpolated.imag ~= 0')
else:
print('WARNING: hInterpolated.imag != 0 (max(abs) = %e)' % np.max(np.abs(hInterpolated.imag)))
......@@ -110,7 +111,7 @@ def dtft(coefs, Ndtft=None, zeroCenter=True, fftShift=True):
else use 0 to 1.0.
Return:
. h: zero padded coefs
. f: normalized frequency axis for HF, fs = 1
. fn: normalized frequency axis for HF, fs = 1
. HF: dtft(h), the frequency transfer function of h
"""
c_interpolate = 10
......@@ -125,12 +126,12 @@ def dtft(coefs, Ndtft=None, zeroCenter=True, fftShift=True):
# DFT
HF = np.fft.fft(h)
# Normalized frequency axis, fs = 1, ws = 2 pi
f = np.arange(0, 1, 1 / Ndtft) # f = 0,pos, neg
fn = np.arange(0, 1, 1 / Ndtft) # fn = 0,pos, neg
if fftShift:
# FFT shift to center HF, f = neg, 0,pos
f = f - 0.5
# FFT shift to center HF, fn = neg, 0,pos
fn = fn - 0.5
HF = np.roll(HF, Ndtft // 2)
return h, f, HF
return h, fn, HF
###############################################################################
......@@ -141,7 +142,8 @@ def estimate_gain_at_frequency(f, HF, freq):
"""Find gain = abs(HF) at frequency in f that is closest to freq.
Input:
. f, HF: frequency axis and spectrum as from dtft() with fftShift=True
. f: frequency axis as f = fn * fs for fn from dtft() with fftShift=True
. HF: spectrum as from dtft() with fftShift=True
. freq : frequency to look for in f
Return:
. fIndex: index of frequency in f that is closest to freq
......@@ -158,19 +160,26 @@ def estimate_gain_at_frequency(f, HF, freq):
def estimate_frequency_at_gain(f, HF, gain):
"""Find positive frequency in f that has gain = abs(HF) closest to gain.
Look only in positive frequencies and from highest frequency to lowest
frequency to avoid band pass ripple gain variation in LPF.
Input:
. f, HF: frequency axis and spectrum as from dtft() with fftShift=True
. f: frequency axis as f = fn * fs for fn from dtft() with fftShift=True
. HF: spectrum as from dtft() with fftShift=True
. gain : gain to look for in HF
Return:
. fIndex: index of frequency in f that has abs(HF) closest to gain
. fValue: frequency at fIndex
. fGain : gain = abs(HF) at fIndex
"""
# Look only in positive frequencies
pos = len(f[f <= 0])
HFpos = HF[pos:]
HFgain = np.abs(HFpos)
HFdiff = np.abs(HFgain - gain)
fIndex = pos + HFdiff.argmin()
# Flip to avoid detections in LPF band pass
HFdiff = np.flipud(HFdiff)
fIndex = pos + len(HFpos) - 1 - HFdiff.argmin()
fValue = f[fIndex]
fGain = np.abs(HF[fIndex])
return fIndex, fValue, fGain
......
......@@ -57,7 +57,7 @@ def plot_time_response(h, title='', color='r', markers=False):
plt.grid(True)
def plot_iir_filter_analysis(b, a, fs=1, whole=False, Ntime=100, step=False, log=False, show=[]):
def plot_iir_filter_analysis(b, a, fs=1.0, whole=False, Ntime=100, step=False, log=False, show=[]):
"""Plot and print iir filter analysis results.
Input:
......@@ -137,25 +137,22 @@ def plot_iir_filter_analysis(b, a, fs=1, whole=False, Ntime=100, step=False, log
return z, p, k
def plot_spectra(f, HF, fs=1.0, fLim=None, dbLim=None):
def plot_spectra(fn, HF, fs=1.0, fLim=None, dbLim=None):
"""Plot spectra for power, magnitude, phase, real, imag
Input:
. f: normalized frequency axis for HF (fs = 1)
. fn: normalized frequency axis for HF (fs = 1)
. HF: spectrum, e.g. frequency transfer function HF = DTFT(h)
. fs: sample frequency in Hz, scale f by fs, fs >= 1
. fs: sample frequency in Hz
"""
Hmag = np.abs(HF)
Hphs = np.unwrap(np.angle(HF))
Hpow_dB = pow_db(HF) # power response
fn = f * fs
if fs > 1:
flabel = 'Frequency [fs / %d]' % fs
else:
flabel = 'Frequency [fs]'
f = fn * fs # scale fn by fs
flabel = 'Frequency [fs = %f]' % fs
plt.figure(1)
plt.plot(fn, Hpow_dB)
plt.plot(f, Hpow_dB)
plt.title('Power spectrum')
plt.ylabel('Power [dB]')
plt.xlabel(flabel)
......@@ -166,8 +163,8 @@ def plot_spectra(f, HF, fs=1.0, fLim=None, dbLim=None):
plt.grid(True)
plt.figure(2)
plt.plot(fn, HF.real, 'r')
plt.plot(fn, HF.imag, 'g')
plt.plot(f, HF.real, 'r')
plt.plot(f, HF.imag, 'g')
plt.title('Complex voltage spectrum')
plt.ylabel('Voltage')
plt.xlabel(flabel)
......@@ -177,7 +174,7 @@ def plot_spectra(f, HF, fs=1.0, fLim=None, dbLim=None):
plt.grid(True)
plt.figure(3)
plt.plot(fn, Hmag)
plt.plot(f, Hmag)
plt.title('Magnitude spectrum') # = amplitude
plt.ylabel('Voltage')
plt.xlabel(flabel)
......@@ -186,7 +183,7 @@ def plot_spectra(f, HF, fs=1.0, fLim=None, dbLim=None):
plt.grid(True)
plt.figure(4)
plt.plot(fn, Hphs)
plt.plot(f, Hphs)
plt.title('Phase spectrum (note -1: pi = -pi)')
plt.ylabel('Phase [rad]')
plt.xlabel(flabel)
......@@ -195,18 +192,19 @@ def plot_spectra(f, HF, fs=1.0, fLim=None, dbLim=None):
plt.grid(True)
def plot_power_spectrum(f, HF, fmt='r', fs=1.0, fLim=None, dbLim=None):
def plot_power_spectrum(fn, HF, fmt='r', fs=1.0, fLim=None, dbLim=None):
"""Plot power spectrum
Input:
. f: normalized frequency axis for HF (fs = 1)
. fn: normalized frequency axis for HF (fs = 1)
. HF: spectrum, e.g. frequency transfer function HF = DTFT(h)
. fmt: curve format string
. fs: sample frequency in Hz, scale f by fs, fs >= 1
. fs: sample frequency in Hz
"""
f = fn * fs # scale fn by fs
flabel = 'Frequency [fs = %f]' % fs
plt.plot(f * fs, pow_db(HF), fmt)
plt.plot(f, pow_db(HF), fmt)
plt.title('Power spectrum')
plt.ylabel('Power [dB]')
plt.xlabel(flabel)
......@@ -217,18 +215,19 @@ def plot_power_spectrum(f, HF, fmt='r', fs=1.0, fLim=None, dbLim=None):
plt.grid(True)
def plot_magnitude_spectrum(f, HF, fmt='r', fs=1.0, fLim=None, voltLim=None):
def plot_magnitude_spectrum(fn, HF, fmt='r', fs=1.0, fLim=None, voltLim=None):
"""Plot magnitude (= voltage) spectrum
Input:
. f: normalized frequency axis for HF (fs = 1)
. fn: normalized frequency axis for HF (fs = 1)
. HF: spectrum, e.g. frequency transfer function HF = DTFT(h)
. fmt: curve format string
. fs: sample frequency in Hz, scale f by fs, fs >= 1
. fs: sample frequency in Hz
"""
f = fn * fs # scale fn by fs
flabel = 'Frequency [fs = %f]' % fs
plt.plot(f * fs, np.abs(HF), fmt)
plt.plot(f, np.abs(HF), fmt)
plt.title('Magnitude spectrum')
plt.ylabel('Voltage')
plt.xlabel(flabel)
......@@ -239,28 +238,27 @@ def plot_magnitude_spectrum(f, HF, fmt='r', fs=1.0, fLim=None, voltLim=None):
plt.grid(True)
def plot_two_power_spectra(f1, HF1, name1, f2, HF2, name2, fs=1.0, fLim=None, dbLim=None, showRoll=False):
def plot_two_power_spectra(fn1, HF1, name1, fn2, HF2, name2, fs=1.0, fLim=None, dbLim=None, showRoll=False):
"""Plot two power spectra in same plot for comparison
Input:
. f1,f2: normalized frequency axis for HF1, HF2 (fs = 1)
. fn1,fn2: normalized frequency axis for HF1, HF2 (fs = 1)
. HF1, HF2: spectrum, e.g. frequency transfer function HF = DTFT(h)
. fs: sample frequency in Hz, scale f by fs, fs >= 1
. fs: sample frequency in Hz
"""
if fs > 1:
flabel = 'Frequency [fs / %d]' % fs
else:
flabel = 'Frequency [fs]'
f1 = fn1 * fs # scale fn1 by fs
f2 = fn2 * fs # scale fn1 by fs
flabel = 'Frequency [fs = %f]' % fs
if showRoll:
plt.plot(f1 * fs, pow_db(HF1), 'r',
f1 * fs, np.roll(pow_db(HF1), len(f1) // 2), 'r--',
f2 * fs, pow_db(HF2), 'b',
f2 * fs, np.roll(pow_db(HF2), len(f2) // 2), 'b--')
plt.plot(f1, pow_db(HF1), 'r',
f1, np.roll(pow_db(HF1), len(f1) // 2), 'r--',
f2, pow_db(HF2), 'b',
f2, np.roll(pow_db(HF2), len(f2) // 2), 'b--')
plt.legend([name1, '', name2, ''])
else:
plt.plot(f1 * fs, pow_db(HF1), 'r',
f2 * fs, pow_db(HF2), 'b')
plt.plot(f1, pow_db(HF1), 'r',
f2, pow_db(HF2), 'b')
plt.legend([name1, name2])
plt.title('Power spectrum')
plt.ylabel('Power [dB]')
......
......@@ -1671,6 +1671,13 @@ Dan zonder --host
Helaas verwacht het SST script dat de data op de lokale machine aankomt, maar hetkomt op dop421 aan. Dus voor offload scripts werkt lokaal draaien zo niet.
*******************************************************************************
* DTS-subrack WSRT
*******************************************************************************
http://195.169.63.101/jupyter/lab/tree/notebooks/aps-test-gui/apsgui/APSH0_test.ipynb
*******************************************************************************
* Flake8, black
*******************************************************************************
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment