diff --git a/applications/lofar2/model/rtdsp/firfilter.py b/applications/lofar2/model/rtdsp/firfilter.py index f1f620998365db8bd05764c23075263501bade8d..daa2d90926af16ce057eac64513a24a6189a7926 100644 --- a/applications/lofar2/model/rtdsp/firfilter.py +++ b/applications/lofar2/model/rtdsp/firfilter.py @@ -30,7 +30,7 @@ import numpy as np from scipy import signal from .utilities import ceil_div, is_even, is_symmetrical -from .fourier import fourier_interpolate +from .fourier import fourier_interpolate, dtft, estimate_gain_at_frequency ############################################################################### @@ -161,7 +161,7 @@ def ideal_low_pass_filter(Npoints, Npass, bandEdgeGain=1.0): def design_fir_low_pass_filter(method, Ncoefs, fpass, fstop, fcutoff=0, cutoffGain=0.5, rippleWeights=[1, 1], kaiserBeta=0, fs=1.0, - cutoffBW=0): + cutoffBW=0, verbosity=True): """Derive FIR coefficients for low pass filter Use method 'firls' or 'remez', fs = 1.0 @@ -196,6 +196,7 @@ def design_fir_low_pass_filter(method, - kaiserBeta: When kaiserBeta > 0, then additionally apply a Kaiser window on FIR coefficients - cutoffBW: workaround bandwidth at fcutoff for firls, because firls does not support zero BW at fcutoff. + - verbosity: when > 0 print() status, else no print() Return: - h: FIR coefficients for requested Ncoefs """ @@ -205,23 +206,22 @@ def design_fir_low_pass_filter(method, # Initial FIR filter length N # . Use interpolation of factor Q shorter filter to ensure the FIR filter design converges - # and to speed up calculation. N >> NFirlsMax = 1024 is not feasible. + # and to speed up calculation. N >> NcoefsFirlsMax = 1024 is not feasible. # . The passband ripple and stopband attenuation depend on the transition bandwidth # and the rippleWeights. Choose transition band width ~< fpass, to ensure the FIR filter # design converges and improve the passband ripple and stopband attenuation. A too large # transition bandwidth also gives the design too much freedom and causes artifacts in # the transition band. - # . scipy firls() defines fpass relative to fs, so can use fpass as cutoff frequency. if method == 'firls': - NFirlsMax = 1024 - Q = ceil_div(Ncoefs, NFirlsMax) + NcoefsFirlsMax = 1024 + Q = ceil_div(Ncoefs, NcoefsFirlsMax) N = Ncoefs // Q # If necessary -1 to make odd, because firls only supports odd number of FIR coefficients if is_even(N): N = N - 1 if method == 'remez': - NRemezMax = 512 - Q = ceil_div(Ncoefs, NRemezMax) + NcoefsRemezMax = 512 + Q = ceil_div(Ncoefs, NcoefsRemezMax) N = Ncoefs // Q # Low pass transition band @@ -269,21 +269,99 @@ def design_fir_low_pass_filter(method, HFfir = np.fft.fft(hFir) h = fourier_interpolate(HFfir, Ncoefs) - print('> design_fir_low_pass_filter()') - print('. Method = %s' % method) - print('. Q = %f' % Q) - print('. fpass = %f' % fpass) - print('. fstop = %f' % fstop) - print('. lpBW = %f' % lpBW) - if fcutoff: - print('. fcutoff = %f' % fcutoff) - print('. cutoffGain = %f' % cutoffGain) - print('. fNyquist = %f' % fNyquist) - print('. fs = %f' % fs) - print('. Ncoefs = %d' % len(h)) - print('. DC sum = %f' % np.sum(h)) - print('. Symmetrical coefs = %s' % is_symmetrical(h)) - print('') + if verbosity: + print('> design_fir_low_pass_filter():') + print('. Method = %s' % method) + print('. Q = %f' % Q) + print('. fpass = %f' % fpass) + print('. fstop = %f' % fstop) + print('. lpBW = %f' % lpBW) + if fcutoff: + print('. fcutoff = %f' % fcutoff) + print('. cutoffGain = %f' % cutoffGain) + print('. fNyquist = %f' % fNyquist) + print('. fs = %f' % fs) + print('. Ncoefs = %d' % len(h)) + print('. DC sum = %f' % np.sum(h)) + print('. Symmetrical coefs = %s' % is_symmetrical(h)) + print('') + return h + + +def design_fir_low_pass_filter_adjust(method, + Ncoefs, fpass, fstop, fcutoff, cutoffGain=0.5, rippleWeights=[1, 1], + kaiserBeta=0, fs=1.0, resolution=1e-4, verbosity=1): + """Derive FIR coefficients for low pass filter for exact fcutoff + + Uses design_fir_low_pass_filter() but achieves cutoffGain at fcutoff by adjusting fpass or fstop, and + fpass < fcutoff < fstop. + """ + # Determine whether to adjust fpass or fstop + nofIterations = 1 + nofIterationsMax = 100 + h = design_fir_low_pass_filter(method, Ncoefs, fpass, fstop, rippleWeights=rippleWeights, + kaiserBeta=kaiserBeta, fs=fs, verbosity=0) + _, fn, HF = dtft(h) + f = fn * fs + fIndex, fValue, fGain = estimate_gain_at_frequency(f, HF, fcutoff) + if fGain < cutoffGain: + adjustBand = 0 # increase fpass + adjustBW = fcutoff - fpass + else: + adjustBand = 1 # decrease fstop + adjustBW = fstop - fcutoff + + # Iterate to achieve cutoffGain at fcutoff + fRadix = 32 + fStep = adjustBW / fRadix + gainResolution = cutoffGain * resolution + FP = fpass + FS = fstop + while True: + prevFP = FP + prevFS = FS + if adjustBand == 0: + if fGain < cutoffGain: + # Keep on increasing FP + FP += fStep + else: + # FP too high, one fStep back and increment FP with reduced fStep + FP -= fStep + fStep /= fRadix + FP += fStep + else: + if fGain > cutoffGain: + # Keep on decreasing FS + FS -= fStep + else: + # FS too low, one fStep back and decrement FS with reduced fStep + FS += fStep + fStep /= fRadix + FS -= fStep + if np.abs(fGain - cutoffGain) < gainResolution: + break + if nofIterations > nofIterationsMax: + break + nofIterations += 1 + h = design_fir_low_pass_filter(method, Ncoefs, FP, FS, rippleWeights=rippleWeights, + kaiserBeta=kaiserBeta, fs=fs, verbosity=0) + _, fn, HF = dtft(h) + fIndex, fValue, fGain = estimate_gain_at_frequency(f, HF, fcutoff) + + if verbosity: + print('> design_fir_low_pass_filter_adjust():') + print('. nofIterations = %d' % nofIterations) + print('. fpass, fcutoff, stop = %s' % str((fpass, fcutoff, fstop))) + print('. FP, fcutoff, FS = %s' % str((FP, fcutoff, FS))) + print('. fcutoff / fpass = %f' % (fcutoff / fpass)) + print('. fstop / fcutoff = %f' % (fstop / fcutoff)) + print('. FP / fpass = %f' % (FP / fpass)) + print('. FS / fstop = %f' % (FS / fstop)) + # Rerun with same parameters to get logging + h = design_fir_low_pass_filter(method, Ncoefs, prevFP, prevFS, rippleWeights=rippleWeights, + kaiserBeta=kaiserBeta, fs=fs) + if nofIterations > nofIterationsMax: + return None return h @@ -344,12 +422,22 @@ def prototype_fir_low_pass_filter(method='firls', ############################################################################### -# Filterbank transfer function for prototype LPF +# Filterbank ############################################################################### -def derive_filterbank_transfer(HFprototype, Npoints): - Lprototype = len(HFprototype) - Lbin = Lprototype / Npoints +def filterbank_aggregate_transfer(HFprototype, Npoints): + """Aggregate frequency reponse of filterbank + + Sum of bandpass responses for all Npoints frequency bins in a filterbank. + + N-1 + HFbank = sum HF_k(w), with HF_k(w) = H0(w - 2pi/N k) + k=0 + + and H0(w) is the prototype LPF [PROAKIS Eq. 10.9.2]. + """ + Lprototype = len(HFprototype) # corresponds to 2 pi + Lbin = Lprototype / Npoints # corresponds to 2 pi / N HFbank = np.zeros(Lprototype, dtype='cfloat') for bi in range(Npoints): HFbank += np.roll(HFprototype, bi * Lbin)