diff --git a/applications/lofar2/model/pfb_os/multirate_mixer.ipynb b/applications/lofar2/model/pfb_os/multirate_mixer.ipynb index 1159c80928509170726c02f3da8dead030315470..1ce67dffc46901c690a9b684526fa3aafad6912a 100644 --- a/applications/lofar2/model/pfb_os/multirate_mixer.ipynb +++ b/applications/lofar2/model/pfb_os/multirate_mixer.ipynb @@ -73,10 +73,11 @@ " read_coefficients_file\n", "from rtdsp.firfilter import filterbank_frequency_response\n", "from rtdsp.fourier import dtft\n", - "from rtdsp.multirate import down, up, unit_circle_loops_phasor_arr, \\\n", - " maximal_downsample_bpf, non_maximal_downsample_bpf, \\\n", - " maximal_upsample_bpf, non_maximal_upsample_bpf, \\\n", - " analysis_dft_filterbank, synthesis_dft_filterbank\n", + "from rtdsp.multirate import down, up\n", + "from rtdsp.singlechannel import unit_circle_loops_phasor_arr, \\\n", + " maximal_downsample_bpf, non_maximal_downsample_bpf, \\\n", + " maximal_upsample_bpf, non_maximal_upsample_bpf\n", + "from rtdsp.dftfilterbank import analysis_dft_filterbank, synthesis_dft_filterbank\n", "from rtdsp.plotting import plot_power_spectrum, plot_magnitude_spectrum, plot_phase_spectrum" ] }, @@ -520,7 +521,7 @@ "name": "stderr", "output_type": "stream", "text": [ - "/tmp/ipykernel_11899/4263550011.py:2: UserWarning: The filter's denominator is extremely small at frequencies [0.270, 0.295, 0.319, 0.344, 0.368, 0.393, 0.417, 0.442, 0.466, 0.491, 0.515, 0.540, 0.565, 0.589, 0.614, 0.638, 0.663, 0.687, 0.712, 0.736, 0.761, 0.785, 0.810, 0.834, 0.859, 0.884, 0.908, 0.933, 0.957, 0.982, 1.006, 1.031, 1.055, 1.080, 1.104, 1.129, 1.154, 1.178, 1.203, 1.227, 1.252, 1.276, 1.301, 1.325, 1.350, 1.374, 1.399, 1.424, 1.448, 1.473, 1.497, 1.522, 1.546, 1.571, 1.595, 1.620, 1.644, 1.669, 1.694, 1.718, 1.743, 1.767, 1.792, 1.816, 1.841, 1.865, 1.890, 1.914, 1.939, 1.963, 1.988, 2.013, 2.037, 2.062, 2.086, 2.111, 2.135, 2.160, 2.184, 2.209, 2.233, 2.258, 2.283, 2.307, 2.332, 2.356, 2.381, 2.405, 2.430, 2.454, 2.479, 2.503, 2.528, 2.553, 2.577, 2.602, 2.626, 2.651, 2.675, 2.700, 2.724, 2.749, 2.773, 2.798, 2.823, 2.847, 2.872, 2.896, 2.921, 2.945, 2.970, 2.994, 3.019, 3.043, 3.068, 3.093, 3.117], around which a singularity may be present\n", + "/tmp/ipykernel_12022/4263550011.py:2: UserWarning: The filter's denominator is extremely small at frequencies [0.270, 0.295, 0.319, 0.344, 0.368, 0.393, 0.417, 0.442, 0.466, 0.491, 0.515, 0.540, 0.565, 0.589, 0.614, 0.638, 0.663, 0.687, 0.712, 0.736, 0.761, 0.785, 0.810, 0.834, 0.859, 0.884, 0.908, 0.933, 0.957, 0.982, 1.006, 1.031, 1.055, 1.080, 1.104, 1.129, 1.154, 1.178, 1.203, 1.227, 1.252, 1.276, 1.301, 1.325, 1.350, 1.374, 1.399, 1.424, 1.448, 1.473, 1.497, 1.522, 1.546, 1.571, 1.595, 1.620, 1.644, 1.669, 1.694, 1.718, 1.743, 1.767, 1.792, 1.816, 1.841, 1.865, 1.890, 1.914, 1.939, 1.963, 1.988, 2.013, 2.037, 2.062, 2.086, 2.111, 2.135, 2.160, 2.184, 2.209, 2.233, 2.258, 2.283, 2.307, 2.332, 2.356, 2.381, 2.405, 2.430, 2.454, 2.479, 2.503, 2.528, 2.553, 2.577, 2.602, 2.626, 2.651, 2.675, 2.700, 2.724, 2.749, 2.773, 2.798, 2.823, 2.847, 2.872, 2.896, 2.921, 2.945, 2.970, 2.994, 3.019, 3.043, 3.068, 3.093, 3.117], around which a singularity may be present\n", " w, gd = signal.group_delay((hPrototype, [1.0]), w=1024, fs=1)\n" ] }, @@ -693,7 +694,7 @@ { "data": { "text/plain": [ - "<matplotlib.legend.Legend at 0x7fb48438d7c0>" + "<matplotlib.legend.Legend at 0x7f3841835460>" ] }, "execution_count": 21, diff --git a/applications/lofar2/model/rtdsp/__init__.py b/applications/lofar2/model/rtdsp/__init__.py index 98a0c24244b3b768dace56da0b5608ae6d62f649..9de8a42e536ce8460b25446300e7ad1ce41bd27e 100644 --- a/applications/lofar2/model/rtdsp/__init__.py +++ b/applications/lofar2/model/rtdsp/__init__.py @@ -1,14 +1,18 @@ """Init file for the Radio Telescope Digital Signal Processing (RTDSP) package. Modules: -. utilities : Basic functions used in the other modules -. windows : Windows functions for filter design -. fourier : DFT based functions -. hilbert : Hilbert transform -. firfilter : FIR filter design -. iirfilter : IIR filter design -. multirate : Multirate functions for resampling -. plotting : Plotting impulse responses and spectra +. utilities : Basic functions used in the other modules +. windows : Windows functions for filter design +. fourier : DFT based functions +. hilbert : Hilbert transform +. firfilter : FIR filter design +. iirfilter : IIR filter design +. multirate : Multirate functions for resampling +. singlechannel : Single channel downconverter downsampler and single channel + upconverter upsampler, using LO +. dftfilterbank : Multi channel analysis filterbank and multi channel synthesis + filterbank, using DFT and IDFT +. plotting : Plotting impulse responses and spectra References: [1] dsp_study_erko.txt diff --git a/applications/lofar2/model/rtdsp/dftfilterbank.py b/applications/lofar2/model/rtdsp/dftfilterbank.py new file mode 100644 index 0000000000000000000000000000000000000000..f20d3d75d329253a2b4e23d706e7e2ccb463f911 --- /dev/null +++ b/applications/lofar2/model/rtdsp/dftfilterbank.py @@ -0,0 +1,334 @@ +#! /usr/bin/env python3 +############################################################################### +# +# Copyright 2024 +# ASTRON (Netherlands Institute for Radio Astronomy) <http://www.astron.nl/> +# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +############################################################################### + +# Author: Eric Kooistra +# Purpose: Multi channel analysis filterbank and multi channel synthesis +# filterbank, using DFT and IDFT +# Description: +# +# Usage: +# . Use [2] to verify this code. +# +# References: +# [1] dsp_study_erko.txt +# [2] pfb_os/multirate_mixer.ipynb +# +# Books: +# . LYONS +# . HARRIS +# . CROCHIERE + +import numpy as np +from sys import exit +from .utilities import ceil_div +from .multirate import fold, \ + PolyPhaseFirFilterStructure, \ + polyphase_data_for_downsampling_whole_x + + +def analysis_dft_filterbank(x, Ndown, Ndft, coefs, structure, commutator=None, verbosity=1): + """Analysis DFT filterbank with Ros = Ndft / Ndown. + + Implements WOLA structure for DFT filterbank [CROCHIERE Fig 7.19]. Key + steps: + + - Signal x has time index n. Mixer local oscillator (LO) and LPF and + downsampler M, yields the short-time spectrum of signal x at time n = mM, + so M is the block size or downsample rate [CROCHIERE Eq 7.65 = Eq 7.9, + HARRIS Eq 6.1] + - Change from fixed signal time frame n and sliding filter, to sliding time + frame r with fixed filter. This yields factor W_K^(kmM) [CROCHIERE Eq + 7.69], with K = Ndft. + - Assume filter h length is Ncoefs = Ntaps * K, for K frequency bins. The + h[-r] weights the signal at n = nM, so the DFT of the weighted signal has + Ncoefs bins k', but for the filterbank only bins k' = k Ntaps are needed. + This pruned DFT can be calculated by stacking the blocks of K samples of + the weighted signal and then calculating the K point DFT [CROCHIERE Eq + 7.73, Fig 7.19, BUNTON]. The stacking sum is like polyphase FIR with K + branches, and data is shifted in per block of M samples from x. + - The time (m) and bin (k) dependend phase shift W_K^(kmM) = + exp(-j 2pi k m M / K) of the DFT output is equivalent to a circular time + shift by l = (m M) % K samples of the DFT input. Circular time shift DFT + theorem [CROCHIERE Fig 7.21]: + x[n] <= DFT => X(k), then + x[(n - l) % K] <= DFT => X(k) exp(-j 2pi k l / K) + + Input: + . x: Input signal x[n] + . Ndown: downsample factor and input block size + . Ndft: DFT size, number of polyphases in PFS FIR filter + . coefs: prototype LPF FIR filter coefficients for anti aliasing BPF + . structure: 'wola' or 'pfs' + . commutator: only for 'pfs', counter clockwise 'ccw' to use IDFT or + clockwise 'cw' to use DFT. Both yield identical result Yc, because + IDFT(x) = 1/Ndft * DFT(fold(x)). + - verbosity: when > 0 print() status, else no print() + Return: + . Yc: Downsampled and downconverted output signal Yc[k, m], m = n // M for + all Ndft bins k. Complex baseband signals. + """ + Ros = Ndft / Ndown + + # Prepend x with Ndown - 1 zeros, and represent x in Nblocks of Ndown + # samples + Nzeros = Ndown - 1 + xBlocks, xData, Nx, Nblocks = polyphase_data_for_downsampling_whole_x(x, Ndown, Nzeros) + + # Prepare output + Yc = np.zeros((Ndft, Nblocks), dtype='cfloat') + + # Oversampling analysis DFT filterbank + # . note: fold = roll -1 then flip = flip then roll +1 + if structure == 'wola': + # >>> Use WOLA structure + # Use PFS with Ndft polyphases and shift in xData in overlapping + # blocks of Ncoefs samples, to apply coefs as window. The coefs in + # the pfs are already in flipped order conform [CROCHIERE Eq 7.70]. + pfs = PolyPhaseFirFilterStructure(Ndft, coefs) + # print(pfs.coefs) + # print(pfs.polyCoefs) + + # Prepend xData with pfs.Ncoefs - Ndown zeros to align with 'pfs' + # implementation + xData = np.concatenate((np.zeros(pfs.Ncoefs - Ndown), xData)) + + # PFB loop per Ndown input xData samples + for b in range(Nblocks): + # Window + tRange = np.arange(pfs.Ncoefs) + b * Ndown + tData = xData[tRange] + pfs.shift_in_data(np.flip(tData)) + if b < 3: + # print(b) + # print(tData) + # print(pfs.polyDelays) + pass + zPoly = pfs.polyDelays * pfs.polyCoefs + # Sum pfs.Ntaps number of blocks + pfsData = np.sum(zPoly, axis=1) + # Time shift + tShift = b * Ndown # roll is modulo Ndft + # DFT + pfsShifted = np.roll(pfsData, -tShift) + # fold = roll -1 then flip + pfsShifted = fold(pfsShifted) + Yc[:, b] = np.fft.fft(pfsShifted) + if b < 3: + # print(b, pfs.map_to_delay_line(pfs.polyDelays)) + # print(b, np.flip(pfs.map_to_delay_line(zPoly))) + # print(b, np.flip(pfsData)) + # print(b, Yc[:, b]) + pass + + elif structure == 'bunton': + # >>> Use WOLA structure as in reconstruct.m by John Bunton + Ncoefs = len(coefs) + Ntaps = ceil_div(Ncoefs, Ndft) # taps, columns + Ndelays = Ntaps * Ndft # zero padded coefs + # Prepend xData with pfs.Ncoefs - Ndown zeros to align with 'pfs' + # implementation + xData = np.concatenate((np.zeros(Ncoefs - Ndown), xData)) + # Use coefs as window + wCoefs = coefs + wCoefs = np.flip(coefs) + # Time shift offset, due to roll -1 in fold() ?, to align with LO in + # single channel reference. + tOffset = 1 + # PFB loop per Ndown input xData samples + for b in range(Nblocks): + # Time shift + tShift = b * Ndown + # Apply window + tRange = np.arange(Ncoefs) + tShift + tData = xData[tRange] + zLine = np.zeros(Ndelays, dtype='float') + zLine[0:Ncoefs] = tData * wCoefs + # Sum Ntaps + sumLine = np.zeros(Ndft, dtype='cfloat') + for t in range(Ntaps): + sumLine += zLine[np.arange(Ndft) + t * Ndft] + # Fixed time reference, roll is modulo Ndft + sumShifted = np.roll(sumLine, tOffset + tShift) + # DFT + Yc[:, b] = np.fft.fft(sumShifted) + + elif structure == 'pfs': + # >>> Use PFS + # PFS with Ndft polyphases and shift in xBlocks of Ndown samples + pfs = PolyPhaseFirFilterStructure(Ndft, coefs) + # PFB loop per Ndown input xData samples + for b in range(Nblocks): + # Filter block, the inData blocks are already flipped in + # polyphase_data_for_downsampling_whole_x() + inData = xBlocks[:, b] + pfsData = pfs.filter_block(inData) + + # Time (m) and bin (k) dependend phase shift W_K^(kmM) by circular + # time shift of DFT input + tShift = b * Ndown # roll is modulo Ndft + pfsShifted = np.roll(pfsData, -tShift) + + # For 'ccw' apply IDFT, for 'cw' apply DFT(fold()), due to fold() + # both yield identical result + if commutator == 'cw': # DFT + # For 'cw' fold the pfsData to apply the DFT + pfsShifted = np.roll(pfsShifted, -1) + pfsShifted = np.flip(pfsShifted) + Yc[:, b] = np.fft.fft(pfsShifted) + elif commutator == 'ccw': # IDFT + # With 'ccw' keep the pfsData to apply IDFT + Yc[:, b] = np.fft.ifft(pfsShifted) * Ndft + else: + exit('ERROR: invalid commutator ' + str(commutator)) + else: + exit('ERROR: invalid structure ' + str(structure)) + + if verbosity: + print('> analysis_dft_filterbank():') + print(' . len(x) =', str(len(x))) + print(' . Nx =', str(Nx)) + print(' . Nblocks =', str(Nblocks)) + print(' . Ros =', str(Ros)) + print(' . Ndown =', str(Ndown)) + print(' . Ndft =', str(Ndft)) + print(' . structure =', structure) + if structure == 'pfs': + print(' . commutator =', commutator) + print('') + return Yc + + +def synthesis_dft_filterbank(Xbase, Nup, Ndft, coefs, structure, commutator=None, verbosity=1): + """Synthesis DFT filterbank with Ros = Ndft / Nup. + + Implements WOLA structure for DFT filterbank [CROCHIERE Fig 7.20]. Key + steps: + + - Signal Xbase has time index m and Ndft values. + + Input: + . Xbase: Complex baseband signals for Ndft bins, and Nblocks in time m. + . Nup: upsample factor and output block size + . Ndft: DFT size, number of polyphases in PFS FIR filter + . coefs: prototype LPF FIR filter coefficients for anti aliasing and + interpolating BPF + . structure: 'wola' or 'pfs' + . commutator: 'ccw' to use DFT or 'cw' to use IDFT + - verbosity: when > 0 print() status, else no print() + Return: + . y: Upsampled and upconverted output signal y[n]. + """ + Ros = Ndft / Nup + + # Xbase has Ndft rows and Nblocks columns + Nblocks = np.size(Xbase, axis=1) + + # Prepare output + Noutput = Nup * Nblocks + len(coefs) + y = np.zeros(Noutput, dtype='float') + + # Adjust LO phase for group delay of analysis LPF and synthesis LPF in + # series. This is not needed in practise, but is needed to be able to + # exactly compare reconstructed signal with original input time series + # signal. Assume coefs has group delay (len(coefs) - 1) / 2. + hPairGroupDelay = len(coefs) - 1 + + # Oversampling analysis DFT filterbank + if structure == 'wola': + # >>> Use WOLA structure + # Use PFS with Ndft polyphases for delay line and window + pfs = PolyPhaseFirFilterStructure(Ndft, coefs) + + # PFB loop per Nup output y samples + for b in range(Nblocks): + # For 'ccw' apply IDFT, for 'cw' apply DFT(fold()), due to fold() + # both # yield identical result + if commutator == 'cw': # DFT + # For 'cw' fold the Xbase to apply the DFT + xTime = np.real(np.fft.fft(fold(Xbase[:, b]))) + else: # 'ccw', IDFT + # With 'ccw' keep the Xbase to apply IDFT + xTime = Ndft * np.real(np.fft.ifft(Xbase[:, b])) + + # Apply filter group delay (hPairGroupDelay), and time (m) and bin + # (k) dependend phase shift W_K^(kmM) by circular time shift of + # DFT output + tShift = b * Nup - hPairGroupDelay # roll is modulo Ndft + xShifted = np.roll(xTime, -tShift) + + # Load xTime at each tap in polyDelays + pfs.parallel_load(xShifted) + + # Apply FIR coefs per delay element + zPoly = Nup * pfs.polyDelays * pfs.polyCoefs + zLine = pfs.map_to_delay_line(zPoly) + + # Overlap add weigthed input to the output + tRange = np.arange(pfs.Ndelays) + b * Nup + y[tRange] += zLine + if b < 3: + # print(b, Xbase[:, b]) + # print(b, xTime / Ndft) + # print(b, zLine / Ndft / Nup) + # print(b, y[tRange] / Ndft / Nup) + pass + + elif structure == 'bunton': + # >>> Use WOLA structure as in reconstruct.m by John Bunton + Ncoefs = len(coefs) + Ntaps = ceil_div(Ncoefs, Ndft) # taps, columns + Ndelays = Ntaps * Ndft # zero padded coefs + # Use coefs as window + wCoefs = coefs + # wCoefs = np.flip(coefs) + for b in range(Nblocks): + # IDFT + xTime = Ndft * np.real(np.fft.ifft(Xbase[:, b])) + # Sliding time reference + tShift = b * Nup - hPairGroupDelay # roll is modulo Ndft + xShifted = np.roll(xTime, -tShift) + # Copy data at Ntaps + zLine = np.zeros(Ndelays, dtype='float') + for t in range(Ntaps): + zLine[np.arange(Ndft) + t * Ndft] = xShifted + # Apply window + yLine = np.zeros(Ncoefs, dtype='float') + yLine = Nup * zLine[0:Ncoefs] * wCoefs + # Overlap add weigthed input to the output + tRange = np.arange(Ncoefs) + b * Nup + y[tRange] += yLine + + else: + # No 'pfs' for fractional oversampled synthesis, because the + # interpolator cannot be separated in independent branches + # then. Expect when Ros is integer [CROCHIERE 7.2.4]. + exit('ERROR: invalid structure ' + str(structure)) + + if verbosity: + print('> synthesis_dft_filterbank():') + print(' . Nblocks =', str(Nblocks)) + print(' . Ros =', str(Ros)) + print(' . Nup =', str(Nup)) + print(' . Ndft =', str(Ndft)) + print(' . structure =', structure) + print(' . commutator =', commutator) + print('') + return y diff --git a/applications/lofar2/model/rtdsp/multirate.py b/applications/lofar2/model/rtdsp/multirate.py index 4a0868fd234436ade41ac2b854c244d2f53d2ccc..3b0139e30aa500a603ac9565242ef90d0b278737 100644 --- a/applications/lofar2/model/rtdsp/multirate.py +++ b/applications/lofar2/model/rtdsp/multirate.py @@ -658,605 +658,3 @@ def resample(x, Nup, Ndown, coefs, verify=False, verbosity=1): # interpolate an print(' . len(y) =', str(len(y))) print('') return y - - -############################################################################### -# Single bandpass channel up and downsampling and up and downconversion -############################################################################### - -def unit_circle_loops_phasor_arr(k, N, sign): - """Return array of N phasors on k loops along the unit circle. - - Polyphase dependent phase offsets for bin k, when N = Ndown = Ndft [HARRIS - Eq 6.8]. For k = 1 this yields the roots of unity. - - Input: - . k: bin in range(N) - . N: number of phasors - . sign: +1 or -1 term in exp() - Return: - . pArr: exp(sign 2j pi k / N) for k in 0 : N - 1 - """ - pArr = np.array([np.exp(sign * 2j * np.pi * p * k / N) for p in range(N)]) - return pArr - - -def time_shift_phasor_arr(k, M, Ndft, Msamples, sign): - """Return array of Msamples phasors in time to compensate for oversampling - time shift. - - The time shift due to down or upsampling causes a frequency component of - k * M / Ndft. With oversampling M < Ndft, and then after down or upsampling - there remains a frequency offset that can be compensated given by mArr - [HARRIS Eq 9.3]. - - Input: - . k: Index of BPF center frequency w_k = 2 pi k / Ndft - . M: downsample or upsample factor - . Ndft: DFT size, number of bins and number of polyphases in PFS FIR filter - . Msamples: Requested number of output samples in time - . sign: +1 or -1 term in exp() - Return: - . mArr = exp(sign 2j pi k * M / Ndft * m), for m in 0 : Msamples - 1 - """ - mArr = np.exp(sign * 2j * np.pi * k * M / Ndft * np.arange(Msamples)) - return mArr - - -def maximal_downsample_bpf(x, Ndft, k, coefs, verbosity=1): - """BPF x at bin k in range(Ndft) and downsample x by factor Ndown = Ndft. - - Implement maximal downsampling downconverter for one bin (= critically - sampled) [HARRIS Fig 6.14]. - - The BPF is centered at w_k = 2pi k / Ndft, where Ndft is number of - frequency bins, is DFT size. The downsampling is maximal so Ndown = Ndft. - The polyphase structure has Nphases = Ndft branches, so the input x - data that shifts in remains in each branch. Therefore each branch can be - FIR filtered independently for the whole input x using - polyphase_frontend(). - - Input: - . x: Input signal x[n] - . Ndft: downsample factor, number of frequency bins - . k: Index of BPF center frequency w_k = 2 pi k / Ndft - . coefs: prototype FIR filter coefficients for anti aliasing BPF - - verbosity: when > 0 print() status, else no print() - Return: - . yc: Downsampled and downconverted output signal yc[m], m = n // Ndown - for bin k. Complex baseband signal. - """ - # Polyphase FIR filter input x - polyY, Nx, Nxp = polyphase_frontend(x, Ndft, coefs, 'down') - - # Phase rotate per polyphase for bin k, due to delay line at branch inputs - # [HARRIS Eq 6.8] - kPhasors = unit_circle_loops_phasor_arr(k, Ndft, 1) - polyYc = np.zeros((Ndft, Nxp), dtype='cfloat') - for p in range(Ndft): - polyYc[p] = polyY[p] * kPhasors[p] # row = row * scalar - - # Sum the branch outputs to get single downsampled and downconverted output - # complex baseband value yc. - yc = np.sum(polyYc, axis=0) - - if verbosity: - print('> Log maximal_downsample_bpf():') - print(' . len(x) =', str(len(x))) - print(' . Nx =', str(Nx)) - print(' . Nxp =', str(Nxp)) - print(' . len(yc) =', str(len(yc))) # = Nxp - print(' . Ndown = Ndft =', str(Ndft)) - print(' . k =', str(k)) - print('') - return yc - - -def maximal_upsample_bpf(xBase, Ndft, k, coefs, verbosity=1): - """BPF xBase at bin k in range(Ndft), and upsample xBase by factor Nup = - Ndft. - - Implement maximal upsampling upconverter for one bin (= critically - sampled) [HARRIS Fig. 7.16]. - - The BPF is centered at w_k = 2pi k / Ndft, where Ndft is number of - frequency bins, is DFT size. The upsampling is maximal so Nup = Ndft. The - polyphase structure has Nphases = Ndft branches, so the output yc data - shifts out from the same branch for each block of Ndft samples. Therefore - each branch can be FIR filtered independently for the whole input xBase - using polyphase_frontend(). - - Input: - . xBase: Input equivalent baseband signal - . Ndft: upsample factor, number of frequency bins - . k: Index of BPF center frequency w_k = 2 pi k / Ndft - . coefs: prototype FIR filter coefficients for anti aliasing and - interpolating BPF - - verbosity: when > 0 print() status, else no print() - Return: - . yc: Upsampled and upconverted output signal yc[n], n = m U at - intermediate frequency (IF) of bin k. Complex positive frequencies - only signal. - """ - # Polyphase FIR filter input xBase - polyY, Nx, Nxp = polyphase_frontend(xBase, Ndft, coefs, 'up') - - # Adjust LO phase for group delay of analysis LPF and synthesis LPF in - # series. This is not needed in practise, but is needed to be able to - # exactly compare reconstructed signal with original input time series - # signal. Assume coefs has group delay (len(coefs) - 1) / 2. - hPairGroupDelay = len(coefs) - 1 - hPhasor = np.exp(-2j * np.pi * k / Ndft * hPairGroupDelay) - polyY *= hPhasor - - # Phase rotate per polyphase for bin k, due to delay line at branch inputs - # [HARRIS Eq 7.8 = 6.8, Fig 7.16], can be applied after the FIR filter, - # because the kPhasors per polyphase are constants. - kPhasors = unit_circle_loops_phasor_arr(k, Ndft, 1) - polyYc = np.zeros((Ndft, Nxp), dtype='cfloat') - for p in range(Ndft): - polyYc[p] = polyY[p] * kPhasors[p] # row = row * scalar - - # Output Ndft samples yc[m] for every input sample xBase[n] - yc = polyYc.T.reshape(1, Ndft * Nx)[0] - - if verbosity: - print('> Log maximal_upsample_bpf():') - print(' . len(xBase) =', str(len(xBase))) - print(' . Nx =', str(Nx)) - print(' . Nxp =', str(Nxp)) - print(' . len(yc) =', str(len(yc))) # = Nxp - print(' . Nup = Ndft =', str(Ndft)) - print(' . k =', str(k)) - print('') - return yc - - -def non_maximal_downsample_bpf(x, Ndown, k, Ndft, coefs, verbosity=1): - """BPF x at bin k in range(Ndown), and downsample x by factor D = Ndown. - - Implement nonmaximal downsampling downconverter for one bin. The maximal - downsampling downconverter for one bin has the kPhasors per polyphase - branch of [HARRIS Eq. 6.8 and Fig 6.14]. For nonmaximal this needs to be - extended with the tPhasors per polyphase branch of [HARRIS Eq. 9.3, to - compensate for the oversampling time shift. - - The BPF is centered at w_k = 2pi k / Ndft, where Ndft is number of - frequency bins, is DFT size. The polyphase FIR structure has Nphases = Ndft - branches, to fit the requested number of bins. The polyphase FIR structure - is maximally downsampled (= critically sampled) for Ndown = Ndft, but it - can support any Ndown <= Ndft. The input data shifts in per Ndown samples, - so it appears in different branches when Ndown < Ndft and a new block is - shifted in. Therefore the input data cannot be FIR filtered per branch for - the whole input x. Instead it needs to be FIR filtered per block of Ndown - input samples from x, using pfs.polyDelays in pfs.filter_block(). - - Input: - . x: Input signal x[n] - . Ndown: downsample factor - . k: Index of BPF center frequency w_k = 2 pi k / Ndft - . Ndft: DFT size, number of polyphases in PFS FIR filter - . coefs: prototype LPF FIR filter coefficients for anti aliasing BPF - - verbosity: when > 0 print() status, else no print() - Return: - . yc: Downsampled and downconverted output signal yc[m], m = n // D for - bin k. Complex baseband signal. - """ - Ros = Ndft / Ndown - - # Prepend x with Ndown - 1 zeros, and represent x in Nblocks of Ndown samples - Nzeros = Ndown - 1 - xBlocks, _, Nx, Nblocks = polyphase_data_for_downsampling_whole_x(x, Ndown, Nzeros) - - # Prepare output - yc = np.zeros(Nblocks, dtype='cfloat') - - # PFS with Ndft polyphases - pfs = PolyPhaseFirFilterStructure(Ndft, coefs) - kPhasors = unit_circle_loops_phasor_arr(k, Ndft, 1) # [HARRIS Eq 6.8] - - # Oversampling time shift compensation via frequency dependent phase shift - # [HARRIS Eq 9.3] - tPhasors = time_shift_phasor_arr(k, Ndown, Ndft, Nblocks, -1) - - for b in range(Nblocks): - # Filter block - inData = xBlocks[:, b] - pfsData = pfs.filter_block(inData) - # Phase rotate polyphases for bin k - pfsBinData = pfsData * kPhasors # [HARRIS Eq 6.8, 9.3] - # Sum the polyphases to get single downsampled and downconverted output - # value - yc[b] = np.sum(pfsBinData) * tPhasors[b] - - if verbosity: - print('> non_maximal_downsample_bpf():') - print(' . len(x) =', str(len(x))) - print(' . Nx =', str(Nx)) - print(' . Nblocks =', str(Nblocks)) - print(' . len(yc) =', str(len(yc))) # = Nblocks - print(' . Ros =', str(Ros)) - print(' . Ndown =', str(Ndown)) - print(' . Ndft =', str(Ndft)) - print(' . k =', str(k)) - print('') - return yc - - -def non_maximal_upsample_bpf(xBase, Nup, k, Ndft, coefs, verbosity=1): - """BPF xBase at bin k in range(Nup), and upsample xBase by factor U = Nup. - - Implement nonmaximal upsampling upconverter for one bin. The maximal - upsampling upconverter for one bin has the kPhasors per polyphase - branch similar of [HARRIS Fig 7.16]. For nonmaximal this needs to be - extended with the tPhasors per polyphase branch similar as for down in - [HARRIS Eq. 9.3], to compensate for the oversampling time shift. - - The BPF is centered at w_k = 2pi k / Ndft, where Ndft is number of - frequency bins, is DFT size. The polyphase FIR structure has Nphases = Ndft - branches, to fit the requested number of bins. The polyphase FIR structure - is maximally upsampled (= critically sampled) for Nup = Ndft, but it - can support any Nup <= Ndft. The output data shifts out per Nup samples, - so it appears from different branches when Nup < Ndft and a new block is - shifted out. Therefore the output data cannot be FIR filtered per branch - for the whole input xBase. Instead it needs to be FIR filtered per block of - Nup output samples, using pfs.polyDelays in pfs.filter_up(). - - TODO - . This code only runs ok for Ros = 1, 2 when Ndft = 16, but not for - Ros = 4, so need to fix that. According to [CROCHIERE 7.2.7] the - polyphase structure is only suitable for Ros is integer >= 1. For other - Ros > 1 the weighted overlap-add (WOLA) structure is suitable, so - need to add WOLA. - - Input: - . xBase: Input equivalent baseband signal xBase[m] - . Nup: upsample factor - . k: Index of BPF center frequency w_k = 2 pi k / Ndft - . Ndft: DFT size, number of polyphases in PFS FIR filter - . coefs: prototype LPF FIR filter coefficients for anti aliasing and - interpolationBPF - - verbosity: when > 0 print() status, else no print() - Return: - . yc: Upsampled and upconverted output signal yc[n], n = m U at - intermediate frequency (IF) of bin k. Complex positive frequencies - only signal. - """ - Ros = Ndft // Nup - # if Ndft % Nup: - # print('WARNING: Only support integer Ros') - # return None - - Nblocks = len(xBase) - - # Prepare output - polyYc = np.zeros((Nup, Nblocks), dtype='cfloat') - - # Adjust LO phase for group delay of analysis LPF and synthesis LPF in - # series. This is not needed in practise, but is needed to be able to - # exactly compare reconstructed signal with original input time series - # signal. Assume coefs has group delay (len(coefs) - 1) / 2. - hPairGroupDelay = len(coefs) - 1 - hPhasor = np.exp(-2j * np.pi * k / Ndft * hPairGroupDelay) - - # Oversampling time shift compensation via frequency dependent phase shift - tPhasors = time_shift_phasor_arr(k, Nup, Ndft, Nblocks, -1) # [HARRIS Eq 9.3] - - # PFS with Ndft polyphases - pfs = PolyPhaseFirFilterStructure(Ndft, coefs, cmplx=True) - kPhasors = unit_circle_loops_phasor_arr(k, Ndft, 1) # [HARRIS Eq 7.8] - - # Polyphase FIR filter input xBase - for b in range(Nblocks): - # Phase rotate polyphases for bin k - inData = xBase[b] * hPhasor * tPhasors[b] * kPhasors - - # Filter block [HARRIS Fig 9.15, CROCHIERE Fig 7.16]] - pfsData = pfs.filter_block(inData) - - # Output Nup samples yc[n] for every input sample xBase[m] - polyYc[:, b] = Ndft * pfsData - - yc = polyYc.T.reshape(1, Nup * Nblocks)[0] - - if verbosity: - print('> non_maximal_upsample_bpf():') - print(' . len(xBase) =', str(len(xBase))) - print(' . Nblocks =', str(Nblocks)) - print(' . len(yc) =', str(len(yc))) # = Nblocks - print(' . Ros =', str(Ros)) - print(' . Nup =', str(Nup)) - print(' . Ndft =', str(Ndft)) - print(' . k =', str(k)) - print('') - return yc - - -def analysis_dft_filterbank(x, Ndown, Ndft, coefs, structure, commutator=None, verbosity=1): - """Analysis DFT filterbank with Ros = Ndft / Ndown. - - Implements WOLA structure for DFT filterbank [CROCHIERE Fig 7.19]. Key - steps: - - - Signal x has time index n. Mixer local oscillator (LO) and LPF and - downsampler M, yields the short-time spectrum of signal x at time n = mM, - so M is the block size or downsample rate [CROCHIERE Eq 7.65 = Eq 7.9, - HARRIS Eq 6.1] - - Change from fixed signal time frame n and sliding filter, to sliding time - frame r with fixed filter. This yields factor W_K^(kmM) [CROCHIERE Eq - 7.69], with K = Ndft. - - Assume filter h length is Ncoefs = Ntaps * K, for K frequency bins. The - h[-r] weights the signal at n = nM, so the DFT of the weighted signal has - Ncoefs bins k', but for the filterbank only bins k' = k Ntaps are needed. - This pruned DFT can be calculated by stacking the blocks of K samples of - the weighted signal and then calculating the K point DFT [CROCHIERE Eq - 7.73, Fig 7.19, BUNTON]. The stacking sum is like polyphase FIR with K - branches, and data is shifted in per block of M samples from x. - - The time (m) and bin (k) dependend phase shift W_K^(kmM) = - exp(-j 2pi k m M / K) of the DFT output is equivalent to a circular time - shift by l = (m M) % K samples of the DFT input. Circular time shift DFT - theorem [CROCHIERE Fig 7.21]: - x[n] <= DFT => X(k), then - x[(n - l) % K] <= DFT => X(k) exp(-j 2pi k l / K) - - Input: - . x: Input signal x[n] - . Ndown: downsample factor and input block size - . Ndft: DFT size, number of polyphases in PFS FIR filter - . coefs: prototype LPF FIR filter coefficients for anti aliasing BPF - . structure: 'wola' or 'pfs' - . commutator: only for 'pfs', counter clockwise 'ccw' to use IDFT or - clockwise 'cw' to use DFT. Both yield identical result Yc, because - IDFT(x) = 1/Ndft * DFT(fold(x)). - - verbosity: when > 0 print() status, else no print() - Return: - . Yc: Downsampled and downconverted output signal Yc[k, m], m = n // M for - all Ndft bins k. Complex baseband signals. - """ - Ros = Ndft / Ndown - - # Prepend x with Ndown - 1 zeros, and represent x in Nblocks of Ndown - # samples - Nzeros = Ndown - 1 - xBlocks, xData, Nx, Nblocks = polyphase_data_for_downsampling_whole_x(x, Ndown, Nzeros) - - # Prepare output - Yc = np.zeros((Ndft, Nblocks), dtype='cfloat') - - # Oversampling analysis DFT filterbank - # . note: fold = roll -1 then flip = flip then roll +1 - if structure == 'wola': - # >>> Use WOLA structure - # Use PFS with Ndft polyphases and shift in xData in overlapping - # blocks of Ncoefs samples, to apply coefs as window. The coefs in - # the pfs are already in flipped order conform [CROCHIERE Eq 7.70]. - pfs = PolyPhaseFirFilterStructure(Ndft, coefs) - # print(pfs.coefs) - # print(pfs.polyCoefs) - - # Prepend xData with pfs.Ncoefs - Ndown zeros to align with 'pfs' - # implementation - xData = np.concatenate((np.zeros(pfs.Ncoefs - Ndown), xData)) - - # PFB loop per Ndown input xData samples - for b in range(Nblocks): - # Window - tRange = np.arange(pfs.Ncoefs) + b * Ndown - tData = xData[tRange] - pfs.shift_in_data(np.flip(tData)) - if b < 3: - # print(b) - # print(tData) - # print(pfs.polyDelays) - pass - zPoly = pfs.polyDelays * pfs.polyCoefs - # Sum pfs.Ntaps number of blocks - pfsData = np.sum(zPoly, axis=1) - # Time shift - tShift = b * Ndown # roll is modulo Ndft - # DFT - pfsShifted = np.roll(pfsData, -tShift) - # fold = roll -1 then flip - pfsShifted = fold(pfsShifted) - Yc[:, b] = np.fft.fft(pfsShifted) - if b < 3: - # print(b, pfs.map_to_delay_line(pfs.polyDelays)) - # print(b, np.flip(pfs.map_to_delay_line(zPoly))) - # print(b, np.flip(pfsData)) - # print(b, Yc[:, b]) - pass - - elif structure == 'bunton': - # >>> Use WOLA structure as in reconstruct.m by John Bunton - Ncoefs = len(coefs) - Ntaps = ceil_div(Ncoefs, Ndft) # taps, columns - Ndelays = Ntaps * Ndft # zero padded coefs - # Prepend xData with pfs.Ncoefs - Ndown zeros to align with 'pfs' - # implementation - xData = np.concatenate((np.zeros(Ncoefs - Ndown), xData)) - # Use coefs as window - wCoefs = coefs - wCoefs = np.flip(coefs) - # Time shift offset, due to roll -1 in fold() ?, to align with LO in - # single channel reference. - tOffset = 1 - # PFB loop per Ndown input xData samples - for b in range(Nblocks): - # Time shift - tShift = b * Ndown - # Apply window - tRange = np.arange(Ncoefs) + tShift - tData = xData[tRange] - zLine = np.zeros(Ndelays, dtype='float') - zLine[0:Ncoefs] = tData * wCoefs - # Sum Ntaps - sumLine = np.zeros(Ndft, dtype='cfloat') - for t in range(Ntaps): - sumLine += zLine[np.arange(Ndft) + t * Ndft] - # Fixed time reference, roll is modulo Ndft - sumShifted = np.roll(sumLine, tOffset + tShift) - # DFT - Yc[:, b] = np.fft.fft(sumShifted) - - elif structure == 'pfs': - # >>> Use PFS - # PFS with Ndft polyphases and shift in xBlocks of Ndown samples - pfs = PolyPhaseFirFilterStructure(Ndft, coefs) - # PFB loop per Ndown input xData samples - for b in range(Nblocks): - # Filter block, the inData blocks are already flipped in - # polyphase_data_for_downsampling_whole_x() - inData = xBlocks[:, b] - pfsData = pfs.filter_block(inData) - - # Time (m) and bin (k) dependend phase shift W_K^(kmM) by circular - # time shift of DFT input - tShift = b * Ndown # roll is modulo Ndft - pfsShifted = np.roll(pfsData, -tShift) - - # For 'ccw' apply IDFT, for 'cw' apply DFT(fold()), due to fold() - # both yield identical result - if commutator == 'cw': # DFT - # For 'cw' fold the pfsData to apply the DFT - pfsShifted = np.roll(pfsShifted, -1) - pfsShifted = np.flip(pfsShifted) - Yc[:, b] = np.fft.fft(pfsShifted) - elif commutator == 'ccw': # IDFT - # With 'ccw' keep the pfsData to apply IDFT - Yc[:, b] = np.fft.ifft(pfsShifted) * Ndft - else: - exit('ERROR: invalid commutator ' + str(commutator)) - else: - exit('ERROR: invalid structure ' + str(structure)) - - if verbosity: - print('> analysis_dft_filterbank():') - print(' . len(x) =', str(len(x))) - print(' . Nx =', str(Nx)) - print(' . Nblocks =', str(Nblocks)) - print(' . Ros =', str(Ros)) - print(' . Ndown =', str(Ndown)) - print(' . Ndft =', str(Ndft)) - print(' . structure =', structure) - if structure == 'pfs': - print(' . commutator =', commutator) - print('') - return Yc - - -def synthesis_dft_filterbank(Xbase, Nup, Ndft, coefs, structure, commutator=None, verbosity=1): - """Synthesis DFT filterbank with Ros = Ndft / Nup. - - Implements WOLA structure for DFT filterbank [CROCHIERE Fig 7.20]. Key - steps: - - - Signal Xbase has time index m and Ndft values. - - Input: - . Xbase: Complex baseband signals for Ndft bins, and Nblocks in time m. - . Nup: upsample factor and output block size - . Ndft: DFT size, number of polyphases in PFS FIR filter - . coefs: prototype LPF FIR filter coefficients for anti aliasing and - interpolating BPF - . structure: 'wola' or 'pfs' - . commutator: 'ccw' to use DFT or 'cw' to use IDFT - - verbosity: when > 0 print() status, else no print() - Return: - . y: Upsampled and upconverted output signal y[n]. - """ - Ros = Ndft / Nup - - # Xbase has Ndft rows and Nblocks columns - Nblocks = np.size(Xbase, axis=1) - - # Prepare output - Noutput = Nup * Nblocks + len(coefs) - y = np.zeros(Noutput, dtype='float') - - # Adjust LO phase for group delay of analysis LPF and synthesis LPF in - # series. This is not needed in practise, but is needed to be able to - # exactly compare reconstructed signal with original input time series - # signal. Assume coefs has group delay (len(coefs) - 1) / 2. - hPairGroupDelay = len(coefs) - 1 - - # Oversampling analysis DFT filterbank - if structure == 'wola': - # >>> Use WOLA structure - # Use PFS with Ndft polyphases for delay line and window - pfs = PolyPhaseFirFilterStructure(Ndft, coefs) - - # PFB loop per Nup output y samples - for b in range(Nblocks): - # For 'ccw' apply IDFT, for 'cw' apply DFT(fold()), due to fold() - # both # yield identical result - if commutator == 'cw': # DFT - # For 'cw' fold the Xbase to apply the DFT - xTime = np.real(np.fft.fft(fold(Xbase[:, b]))) - else: # 'ccw', IDFT - # With 'ccw' keep the Xbase to apply IDFT - xTime = Ndft * np.real(np.fft.ifft(Xbase[:, b])) - - # Apply filter group delay (hPairGroupDelay), and time (m) and bin - # (k) dependend phase shift W_K^(kmM) by circular time shift of - # DFT output - tShift = b * Nup - hPairGroupDelay # roll is modulo Ndft - xShifted = np.roll(xTime, -tShift) - - # Load xTime at each tap in polyDelays - pfs.parallel_load(xShifted) - - # Apply FIR coefs per delay element - zPoly = Nup * pfs.polyDelays * pfs.polyCoefs - zLine = pfs.map_to_delay_line(zPoly) - - # Overlap add weigthed input to the output - tRange = np.arange(pfs.Ndelays) + b * Nup - y[tRange] += zLine - if b < 3: - # print(b, Xbase[:, b]) - # print(b, xTime / Ndft) - # print(b, zLine / Ndft / Nup) - # print(b, y[tRange] / Ndft / Nup) - pass - - elif structure == 'bunton': - # >>> Use WOLA structure as in reconstruct.m by John Bunton - Ncoefs = len(coefs) - Ntaps = ceil_div(Ncoefs, Ndft) # taps, columns - Ndelays = Ntaps * Ndft # zero padded coefs - # Use coefs as window - wCoefs = coefs - # wCoefs = np.flip(coefs) - for b in range(Nblocks): - # IDFT - xTime = Ndft * np.real(np.fft.ifft(Xbase[:, b])) - # Sliding time reference - tShift = b * Nup - hPairGroupDelay # roll is modulo Ndft - xShifted = np.roll(xTime, -tShift) - # Copy data at Ntaps - zLine = np.zeros(Ndelays, dtype='float') - for t in range(Ntaps): - zLine[np.arange(Ndft) + t * Ndft] = xShifted - # Apply window - yLine = np.zeros(Ncoefs, dtype='float') - yLine = Nup * zLine[0:Ncoefs] * wCoefs - # Overlap add weigthed input to the output - tRange = np.arange(Ncoefs) + b * Nup - y[tRange] += yLine - - else: - # No 'pfs' for fractional oversampled synthesis, because the - # interpolator cannot be separated in independent branches - # then. Expect when Ros is integer [CROCHIERE 7.2.4]. - exit('ERROR: invalid structure ' + str(structure)) - - if verbosity: - print('> synthesis_dft_filterbank():') - print(' . Nblocks =', str(Nblocks)) - print(' . Ros =', str(Ros)) - print(' . Nup =', str(Nup)) - print(' . Ndft =', str(Ndft)) - print(' . structure =', structure) - print(' . commutator =', commutator) - print('') - return y diff --git a/applications/lofar2/model/rtdsp/singlechannel.py b/applications/lofar2/model/rtdsp/singlechannel.py new file mode 100644 index 0000000000000000000000000000000000000000..ced9bcf3b25065bf72dd72abad3964fa4899b27e --- /dev/null +++ b/applications/lofar2/model/rtdsp/singlechannel.py @@ -0,0 +1,350 @@ +#! /usr/bin/env python3 +############################################################################### +# +# Copyright 2024 +# ASTRON (Netherlands Institute for Radio Astronomy) <http://www.astron.nl/> +# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +############################################################################### + +# Author: Eric Kooistra +# Purpose: Single channel downconverter downsampler and single channel +# upconverter upsampler, using LO +# Description: +# +# Usage: +# . Use [2] to verify this code. +# +# References: +# [1] dsp_study_erko.txt +# [2] pfb_os/multirate_mixer.ipynb +# +# Books: +# . LYONS +# . HARRIS +# . CROCHIERE + +import numpy as np +from .multirate import PolyPhaseFirFilterStructure, \ + polyphase_frontend, \ + polyphase_data_for_downsampling_whole_x + + +def unit_circle_loops_phasor_arr(k, N, sign): + """Return array of N phasors on k loops along the unit circle. + + Polyphase dependent phase offsets for bin k, when N = Ndown = Ndft [HARRIS + Eq 6.8]. For k = 1 this yields the roots of unity. + + Input: + . k: bin in range(N) + . N: number of phasors + . sign: +1 or -1 term in exp() + Return: + . pArr: exp(sign 2j pi k / N) for k in 0 : N - 1 + """ + pArr = np.array([np.exp(sign * 2j * np.pi * p * k / N) for p in range(N)]) + return pArr + + +def time_shift_phasor_arr(k, M, Ndft, Msamples, sign): + """Return array of Msamples phasors in time to compensate for oversampling + time shift. + + The time shift due to down or upsampling causes a frequency component of + k * M / Ndft. With oversampling M < Ndft, and then after down or upsampling + there remains a frequency offset that can be compensated given by mArr + [HARRIS Eq 9.3]. + + Input: + . k: Index of BPF center frequency w_k = 2 pi k / Ndft + . M: downsample or upsample factor + . Ndft: DFT size, number of bins and number of polyphases in PFS FIR filter + . Msamples: Requested number of output samples in time + . sign: +1 or -1 term in exp() + Return: + . mArr = exp(sign 2j pi k * M / Ndft * m), for m in 0 : Msamples - 1 + """ + mArr = np.exp(sign * 2j * np.pi * k * M / Ndft * np.arange(Msamples)) + return mArr + + +def maximal_downsample_bpf(x, Ndft, k, coefs, verbosity=1): + """BPF x at bin k in range(Ndft) and downsample x by factor Ndown = Ndft. + + Implement maximal downsampling downconverter for one bin (= critically + sampled) [HARRIS Fig 6.14]. + + The BPF is centered at w_k = 2pi k / Ndft, where Ndft is number of + frequency bins, is DFT size. The downsampling is maximal so Ndown = Ndft. + The polyphase structure has Nphases = Ndft branches, so the input x + data that shifts in remains in each branch. Therefore each branch can be + FIR filtered independently for the whole input x using + polyphase_frontend(). + + Input: + . x: Input signal x[n] + . Ndft: downsample factor, number of frequency bins + . k: Index of BPF center frequency w_k = 2 pi k / Ndft + . coefs: prototype FIR filter coefficients for anti aliasing BPF + - verbosity: when > 0 print() status, else no print() + Return: + . yc: Downsampled and downconverted output signal yc[m], m = n // Ndown + for bin k. Complex baseband signal. + """ + # Polyphase FIR filter input x + polyY, Nx, Nxp = polyphase_frontend(x, Ndft, coefs, 'down') + + # Phase rotate per polyphase for bin k, due to delay line at branch inputs + # [HARRIS Eq 6.8] + kPhasors = unit_circle_loops_phasor_arr(k, Ndft, 1) + polyYc = np.zeros((Ndft, Nxp), dtype='cfloat') + for p in range(Ndft): + polyYc[p] = polyY[p] * kPhasors[p] # row = row * scalar + + # Sum the branch outputs to get single downsampled and downconverted output + # complex baseband value yc. + yc = np.sum(polyYc, axis=0) + + if verbosity: + print('> Log maximal_downsample_bpf():') + print(' . len(x) =', str(len(x))) + print(' . Nx =', str(Nx)) + print(' . Nxp =', str(Nxp)) + print(' . len(yc) =', str(len(yc))) # = Nxp + print(' . Ndown = Ndft =', str(Ndft)) + print(' . k =', str(k)) + print('') + return yc + + +def maximal_upsample_bpf(xBase, Ndft, k, coefs, verbosity=1): + """BPF xBase at bin k in range(Ndft), and upsample xBase by factor Nup = + Ndft. + + Implement maximal upsampling upconverter for one bin (= critically + sampled) [HARRIS Fig. 7.16]. + + The BPF is centered at w_k = 2pi k / Ndft, where Ndft is number of + frequency bins, is DFT size. The upsampling is maximal so Nup = Ndft. The + polyphase structure has Nphases = Ndft branches, so the output yc data + shifts out from the same branch for each block of Ndft samples. Therefore + each branch can be FIR filtered independently for the whole input xBase + using polyphase_frontend(). + + Input: + . xBase: Input equivalent baseband signal + . Ndft: upsample factor, number of frequency bins + . k: Index of BPF center frequency w_k = 2 pi k / Ndft + . coefs: prototype FIR filter coefficients for anti aliasing and + interpolating BPF + - verbosity: when > 0 print() status, else no print() + Return: + . yc: Upsampled and upconverted output signal yc[n], n = m U at + intermediate frequency (IF) of bin k. Complex positive frequencies + only signal. + """ + # Polyphase FIR filter input xBase + polyY, Nx, Nxp = polyphase_frontend(xBase, Ndft, coefs, 'up') + + # Adjust LO phase for group delay of analysis LPF and synthesis LPF in + # series. This is not needed in practise, but is needed to be able to + # exactly compare reconstructed signal with original input time series + # signal. Assume coefs has group delay (len(coefs) - 1) / 2. + hPairGroupDelay = len(coefs) - 1 + hPhasor = np.exp(-2j * np.pi * k / Ndft * hPairGroupDelay) + polyY *= hPhasor + + # Phase rotate per polyphase for bin k, due to delay line at branch inputs + # [HARRIS Eq 7.8 = 6.8, Fig 7.16], can be applied after the FIR filter, + # because the kPhasors per polyphase are constants. + kPhasors = unit_circle_loops_phasor_arr(k, Ndft, 1) + polyYc = np.zeros((Ndft, Nxp), dtype='cfloat') + for p in range(Ndft): + polyYc[p] = polyY[p] * kPhasors[p] # row = row * scalar + + # Output Ndft samples yc[m] for every input sample xBase[n] + yc = polyYc.T.reshape(1, Ndft * Nx)[0] + + if verbosity: + print('> Log maximal_upsample_bpf():') + print(' . len(xBase) =', str(len(xBase))) + print(' . Nx =', str(Nx)) + print(' . Nxp =', str(Nxp)) + print(' . len(yc) =', str(len(yc))) # = Nxp + print(' . Nup = Ndft =', str(Ndft)) + print(' . k =', str(k)) + print('') + return yc + + +def non_maximal_downsample_bpf(x, Ndown, k, Ndft, coefs, verbosity=1): + """BPF x at bin k in range(Ndown), and downsample x by factor D = Ndown. + + Implement nonmaximal downsampling downconverter for one bin. The maximal + downsampling downconverter for one bin has the kPhasors per polyphase + branch of [HARRIS Eq. 6.8 and Fig 6.14]. For nonmaximal this needs to be + extended with the tPhasors per polyphase branch of [HARRIS Eq. 9.3, to + compensate for the oversampling time shift. + + The BPF is centered at w_k = 2pi k / Ndft, where Ndft is number of + frequency bins, is DFT size. The polyphase FIR structure has Nphases = Ndft + branches, to fit the requested number of bins. The polyphase FIR structure + is maximally downsampled (= critically sampled) for Ndown = Ndft, but it + can support any Ndown <= Ndft. The input data shifts in per Ndown samples, + so it appears in different branches when Ndown < Ndft and a new block is + shifted in. Therefore the input data cannot be FIR filtered per branch for + the whole input x. Instead it needs to be FIR filtered per block of Ndown + input samples from x, using pfs.polyDelays in pfs.filter_block(). + + Input: + . x: Input signal x[n] + . Ndown: downsample factor + . k: Index of BPF center frequency w_k = 2 pi k / Ndft + . Ndft: DFT size, number of polyphases in PFS FIR filter + . coefs: prototype LPF FIR filter coefficients for anti aliasing BPF + - verbosity: when > 0 print() status, else no print() + Return: + . yc: Downsampled and downconverted output signal yc[m], m = n // D for + bin k. Complex baseband signal. + """ + Ros = Ndft / Ndown + + # Prepend x with Ndown - 1 zeros, and represent x in Nblocks of Ndown samples + Nzeros = Ndown - 1 + xBlocks, _, Nx, Nblocks = polyphase_data_for_downsampling_whole_x(x, Ndown, Nzeros) + + # Prepare output + yc = np.zeros(Nblocks, dtype='cfloat') + + # PFS with Ndft polyphases + pfs = PolyPhaseFirFilterStructure(Ndft, coefs) + kPhasors = unit_circle_loops_phasor_arr(k, Ndft, 1) # [HARRIS Eq 6.8] + + # Oversampling time shift compensation via frequency dependent phase shift + # [HARRIS Eq 9.3] + tPhasors = time_shift_phasor_arr(k, Ndown, Ndft, Nblocks, -1) + + for b in range(Nblocks): + # Filter block + inData = xBlocks[:, b] + pfsData = pfs.filter_block(inData) + # Phase rotate polyphases for bin k + pfsBinData = pfsData * kPhasors # [HARRIS Eq 6.8, 9.3] + # Sum the polyphases to get single downsampled and downconverted output + # value + yc[b] = np.sum(pfsBinData) * tPhasors[b] + + if verbosity: + print('> non_maximal_downsample_bpf():') + print(' . len(x) =', str(len(x))) + print(' . Nx =', str(Nx)) + print(' . Nblocks =', str(Nblocks)) + print(' . len(yc) =', str(len(yc))) # = Nblocks + print(' . Ros =', str(Ros)) + print(' . Ndown =', str(Ndown)) + print(' . Ndft =', str(Ndft)) + print(' . k =', str(k)) + print('') + return yc + + +def non_maximal_upsample_bpf(xBase, Nup, k, Ndft, coefs, verbosity=1): + """BPF xBase at bin k in range(Nup), and upsample xBase by factor U = Nup. + + Implement nonmaximal upsampling upconverter for one bin. The maximal + upsampling upconverter for one bin has the kPhasors per polyphase + branch similar of [HARRIS Fig 7.16]. For nonmaximal this needs to be + extended with the tPhasors per polyphase branch similar as for down in + [HARRIS Eq. 9.3], to compensate for the oversampling time shift. + + The BPF is centered at w_k = 2pi k / Ndft, where Ndft is number of + frequency bins, is DFT size. The polyphase FIR structure has Nphases = Ndft + branches, to fit the requested number of bins. The polyphase FIR structure + is maximally upsampled (= critically sampled) for Nup = Ndft, but it + can support any Nup <= Ndft. The output data shifts out per Nup samples, + so it appears from different branches when Nup < Ndft and a new block is + shifted out. Therefore the output data cannot be FIR filtered per branch + for the whole input xBase. Instead it needs to be FIR filtered per block of + Nup output samples, using pfs.polyDelays in pfs.filter_up(). + + TODO + . This code only runs ok for Ros = 1, 2 when Ndft = 16, but not for + Ros = 4, so need to fix that. According to [CROCHIERE 7.2.7] the + polyphase structure is only suitable for Ros is integer >= 1. For other + Ros > 1 the weighted overlap-add (WOLA) structure is suitable, so + need to add WOLA. + + Input: + . xBase: Input equivalent baseband signal xBase[m] + . Nup: upsample factor + . k: Index of BPF center frequency w_k = 2 pi k / Ndft + . Ndft: DFT size, number of polyphases in PFS FIR filter + . coefs: prototype LPF FIR filter coefficients for anti aliasing and + interpolationBPF + - verbosity: when > 0 print() status, else no print() + Return: + . yc: Upsampled and upconverted output signal yc[n], n = m U at + intermediate frequency (IF) of bin k. Complex positive frequencies + only signal. + """ + Ros = Ndft // Nup + # if Ndft % Nup: + # print('WARNING: Only support integer Ros') + # return None + + Nblocks = len(xBase) + + # Prepare output + polyYc = np.zeros((Nup, Nblocks), dtype='cfloat') + + # Adjust LO phase for group delay of analysis LPF and synthesis LPF in + # series. This is not needed in practise, but is needed to be able to + # exactly compare reconstructed signal with original input time series + # signal. Assume coefs has group delay (len(coefs) - 1) / 2. + hPairGroupDelay = len(coefs) - 1 + hPhasor = np.exp(-2j * np.pi * k / Ndft * hPairGroupDelay) + + # Oversampling time shift compensation via frequency dependent phase shift + tPhasors = time_shift_phasor_arr(k, Nup, Ndft, Nblocks, -1) # [HARRIS Eq 9.3] + + # PFS with Ndft polyphases + pfs = PolyPhaseFirFilterStructure(Ndft, coefs, cmplx=True) + kPhasors = unit_circle_loops_phasor_arr(k, Ndft, 1) # [HARRIS Eq 7.8] + + # Polyphase FIR filter input xBase + for b in range(Nblocks): + # Phase rotate polyphases for bin k + inData = xBase[b] * hPhasor * tPhasors[b] * kPhasors + + # Filter block [HARRIS Fig 9.15, CROCHIERE Fig 7.16]] + pfsData = pfs.filter_block(inData) + + # Output Nup samples yc[n] for every input sample xBase[m] + polyYc[:, b] = Ndft * pfsData + + yc = polyYc.T.reshape(1, Nup * Nblocks)[0] + + if verbosity: + print('> non_maximal_upsample_bpf():') + print(' . len(xBase) =', str(len(xBase))) + print(' . Nblocks =', str(Nblocks)) + print(' . len(yc) =', str(len(yc))) # = Nblocks + print(' . Ros =', str(Ros)) + print(' . Nup =', str(Nup)) + print(' . Ndft =', str(Ndft)) + print(' . k =', str(k)) + print('') + return yc