diff --git a/applications/lofar2/model/rtdsp/multirate.py b/applications/lofar2/model/rtdsp/multirate.py index 3c0bd409e4df12827849eadcc803b86632c4229d..2388ea34388ea18bae5fa7ba092005c438042650 100644 --- a/applications/lofar2/model/rtdsp/multirate.py +++ b/applications/lofar2/model/rtdsp/multirate.py @@ -307,16 +307,15 @@ def polyphase_data_for_downsampling_whole_x(x, Ndown, Nzeros): for m = 0, 1, 2, ..., Nxp - 1. """ Lx = len(x) - Nxp = (Nzeros - 1 + Lx) // Ndown # Number of samples per polyphase + Nxp = (Nzeros + Lx) // Ndown # Number of samples per polyphase Nx = 1 + Ndown * (Nxp - 1) # Used number of samples from x # Load x into polyX with Ndown rows = polyphases - # . prepend x with Ndown - 1 zeros + # . prepend x with Nzeros zeros # . skip any last remaining samples from x, that are not enough yield a new # output FIR sum. polyX = zeros(Ndown * Nxp, np.iscomplexobj(x)) - polyX[Ndown - 1] = x[0] - polyX[Ndown:] = x[1 : Nx] + polyX[Nzeros:] = x[0 : Nx] # . Store data in time order per branch, so with oldest data left, to match # use with lfilter(). Note this differs from order of tap data in # pfs.polyDelays where newest data is left, because the block data is @@ -896,14 +895,20 @@ def non_maximal_upsample_bpf(xBase, Nup, k, Ndft, coefs, verbosity=1): intermediate frequency (IF) of bin k. Complex positive frequencies only signal. """ + Ros = Ndft // Nup + if Ndft % Nup: + print('WARNING: Only support integer Ros') + return None + Nblocks = len(xBase) # Prepare output polyYc = np.zeros((Nup, Nblocks), dtype='cfloat') # PFS with Ndft polyphases - pfsUp = PolyPhaseFirFilterStructure(Nup, coefs, cmplx=True) + pfs = PolyPhaseFirFilterStructure(Ndft, coefs, cmplx=True) kPhasors = unit_circle_loops_phasor_arr(k, Ndft, 1) # [HARRIS Eq 7.8] + print(kPhasors) # Oversampling time shift compensation via frequency dependent phase shift tPhasors = time_shift_phasor_arr(k, Nup, Ndft, Nblocks, -1) # [HARRIS Eq 9.3] @@ -912,12 +917,18 @@ def non_maximal_upsample_bpf(xBase, Nup, k, Ndft, coefs, verbosity=1): for b in range(Nblocks): # Filter block inSample = xBase[b] - pfsData = Nup * pfsUp.filter_up(inSample) + pfsData = Nup * pfs.filter_up(inSample) # Phase rotate polyphases for bin k - pfsBinData = pfsData * kPhasors[0:Nup] + pfsBinData = pfsData * kPhasors + # Sum Ros interleaved rows to get Nup rows [HARRIS Fig 9.15, + # CROCHIERE Fig 7.16]] + pfsUpData = pfsBinData.reshape((Ros, Nup)).T + # pfsUpData = pfsBinData.reshape((Nup, Ros)) + pfsUpData = np.sum(pfsUpData, axis=1) + # pfsUpData = Ros * pfsUpData[:, 0] # Output Nup samples yc[n] for every input sample xBase[m] - outBinData = pfsBinData * tPhasors[b] - polyYc[:, b] = outBinData + outUpData = pfsUpData * tPhasors[b] + polyYc[:, b] = outUpData yc = polyYc.T.reshape(1, Nup * Nblocks)[0] @@ -931,3 +942,76 @@ def non_maximal_upsample_bpf(xBase, Nup, k, Ndft, coefs, verbosity=1): print(' . k =', str(k)) print('') return yc + + +def analysis_dft_filterbank(x, Ndown, Ndft, coefs, verbosity=1): + """DFT filterbank with Ros = Ndft / Ndown. + + Implements WOLA structure for DFT filterbank [CROCHIERE 7.2.5]. Key steps: + + - Signal x has time index n. Mixer local oscillator (LO) and LPF and + downsampler M, yields the short-time spectrum of signal x at time n = mM, + so M is the block size or downsample rate [CROCHIERE Eq 7.65 = Eq 7.9, + HARRIS Eq 6.1] + - Change from fixed signal time frame n and sliding filter, to sliding time + frame r with fixed filter. This yields factor W_K^(kmM) [CROCHIERE Eq + 7.69], with K = Ndft. + - Assume filter h length is Ncoef = Ntaps * K, for K frequency bins. The + h[-r] weights the signal at n = nM, so the DFT of the weighted signal has + Ncoef bins k', but for the filterbank only bins k' = k Ntaps are needed. + This pruned DFT can be calculated by stacking the blocks of K samples of + the weighted signal and then calculating the K point DFT [CROCHIERE Eq + 7.73, Fig 7.19, BUNTON]. The stacking sum is like polyphase FIR with K + branches, and data is shifted in per block of M samples from x. + - The time (m) and bin (k) dependend phase shift W_K^(kmM) = + exp(-j 2pi k m M / K) of the DFT output is equivalent to a circular time + shift by l = (m M) % K samples of the DFT input. Circular time shift DFT + theorem: + + x[n] <= DFT => X(k), then + x[(n - l) % K] <= DFT => X(k) exp(-j 2pi k l / K) + + Input: + . x: Input signal x[n] + . Ndown: downsample factor and input block size + . Ndft: DFT size, number of polyphases in PFS FIR filter + . coefs: prototype LPF FIR filter coefficients for anti aliasing BPF + - verbosity: when > 0 print() status, else no print() + Return: + . Yc: Downsampled and downconverted output signal Yc[k, m], m = n // M for + all Ndft bins k. Complex baseband signals. + """ + # Prepend x with Ndown - 1 zeros, and represent x in Nblocks of Ndown + # samples + Nzeros = Ndown - 1 + xBlocks, Nx, Nblocks = polyphase_data_for_downsampling_whole_x(x, Ndown, Nzeros) + + # Prepare output + Yc = np.zeros((Ndft, Nblocks), dtype='cfloat') + + # PFS with Ndft polyphases + pfs = PolyPhaseFirFilterStructure(Ndft, coefs) + + # Oversampling DFT filterbank + for b in range(Nblocks): + # Filter block + inData = xBlocks[:, b] + pfsData = pfs.filter_block(inData, flipped=True) + + # Apply time (m) and bin (k) dependend phase shift W_K^(kmM) by circular + # time shift of DFT input + tShift = (b * Ndown) % Ndft + pfsShifted = np.roll(pfsData, -tShift) + + # IDFT + Yc[:, b] = Ndft * np.fft.ifft(pfsShifted) + + if verbosity: + print('> analysis_dft_filterbank():') + print(' . len(x) =', str(len(x))) + print(' . Nx =', str(Nx)) + print(' . Nblocks =', str(Nblocks)) + print(' . Ndown =', str(Ndown)) + print(' . Ndft =', str(Ndft)) + print('') + return Yc