Skip to content
Snippets Groups Projects
Commit e1f3d19a authored by Eric Kooistra's avatar Eric Kooistra
Browse files

Remove structure bunton, instead integrate it as variant that is always verified.

parent 2768fe43
No related branches found
No related tags found
1 merge request!420Resolve RTSD-264
Pipeline #101172 passed with warnings
Source diff could not be displayed: it is too large. Options to address this: view the blob.
...@@ -38,7 +38,6 @@ ...@@ -38,7 +38,6 @@
import numpy as np import numpy as np
from sys import exit from sys import exit
from .utilities import ceil_div
from .multirate import fold, \ from .multirate import fold, \
PolyPhaseFirFilterStructure, \ PolyPhaseFirFilterStructure, \
polyphase_data_for_downsampling_whole_x polyphase_data_for_downsampling_whole_x
...@@ -70,6 +69,7 @@ def analysis_dft_filterbank(x, Ndown, Ndft, coefs, structure, commutator=None, v ...@@ -70,6 +69,7 @@ def analysis_dft_filterbank(x, Ndown, Ndft, coefs, structure, commutator=None, v
theorem [CROCHIERE Fig 7.21]: theorem [CROCHIERE Fig 7.21]:
x[n] <= DFT => X(k), then x[n] <= DFT => X(k), then
x[(n - l) % K] <= DFT => X(k) exp(-j 2pi k l / K) x[(n - l) % K] <= DFT => X(k) exp(-j 2pi k l / K)
- Note: fold = roll -1 then flip = flip then roll +1
Input: Input:
. x: Input signal x[n] . x: Input signal x[n]
...@@ -85,72 +85,37 @@ def analysis_dft_filterbank(x, Ndown, Ndft, coefs, structure, commutator=None, v ...@@ -85,72 +85,37 @@ def analysis_dft_filterbank(x, Ndown, Ndft, coefs, structure, commutator=None, v
. Yc: Downsampled and downconverted output signal Yc[k, m], m = n // M for . Yc: Downsampled and downconverted output signal Yc[k, m], m = n // M for
all Ndft bins k. Complex baseband signals. all Ndft bins k. Complex baseband signals.
""" """
Ros = Ndft / Ndown
# Prepend x with Ndown - 1 zeros, and represent x in Nblocks of Ndown
# samples
Nzeros = Ndown - 1
xBlocks, xData, Nx, Nblocks = polyphase_data_for_downsampling_whole_x(x, Ndown, Nzeros)
# Prepare output
Yc = np.zeros((Ndft, Nblocks), dtype='cfloat')
# Oversampling analysis DFT filterbank # Oversampling analysis DFT filterbank
# . note: fold = roll -1 then flip = flip then roll +1
if structure == 'wola': if structure == 'wola':
# >>> Use WOLA structure # >>> Use WOLA structure [CROCHIERE Fig 7.19]
# Use PFS with Ndft polyphases and shift in xData in overlapping # Apply the coefs as a window conform [CROCHIERE Eq 7.70].
# blocks of Ncoefs samples, to apply coefs as window. The coefs in # For the pfs the coefs are already in the correct order
# the pfs are already in flipped order conform [CROCHIERE Eq 7.70].
pfs = PolyPhaseFirFilterStructure(Ndft, coefs) pfs = PolyPhaseFirFilterStructure(Ndft, coefs)
# print(pfs.coefs) # For the delay line the coefs still need to be flipped. This flip is
# print(pfs.polyCoefs) # omitted in reconstruct.m
wCoefs = np.flip(coefs)
# Prepend xData with pfs.Ncoefs - Ndown zeros to align with 'pfs' # Get parameters
# implementation Ncoefs = pfs.Ncoefs
xData = np.concatenate((np.zeros(pfs.Ncoefs - Ndown), xData)) Ntaps = pfs.Ntaps
Ndelays = pfs.Ndelays
# PFB loop per Ndown input xData samples # Determine Nblocks of Ndown samples in x
for b in range(Nblocks): _, _, _, Nblocks = polyphase_data_for_downsampling_whole_x(x, Ndown, Nzeros=0)
# Window
tRange = np.arange(pfs.Ncoefs) + b * Ndown # Prepend x with Ncoefs - 1 zeros to have first down sampled sample at
tData = xData[tRange] # m = 0 start at n = 0, and to fit Nblocks for WOLA
pfs.shift_in_data(np.flip(tData)) Nzeros = pfs.Ncoefs - 1
if b < 3: xData = np.concatenate((np.zeros(Nzeros), x))
# print(b)
# print(tData) # Prepare output
# print(pfs.polyDelays) Yc = np.zeros((Ndft, Nblocks), dtype='cfloat')
pass
zPoly = pfs.polyDelays * pfs.polyCoefs # Need time shift offset to align with LO in single channel reference.
# Sum pfs.Ntaps number of blocks # This tOffset = 1 together with the flip of pfsData makes a fold(),
pfsData = np.sum(zPoly, axis=1) # so that the use of DFT in WOLA analysis is equivalent to using
# Time shift # IDFT as in LO downconverter and PFS analysis.
tShift = b * Ndown # roll is modulo Ndft
# DFT
pfsShifted = np.roll(pfsData, -tShift)
# fold = roll -1 then flip
pfsShifted = fold(pfsShifted)
Yc[:, b] = np.fft.fft(pfsShifted)
if b < 3:
# print(b, pfs.map_to_delay_line(pfs.polyDelays))
# print(b, np.flip(pfs.map_to_delay_line(zPoly)))
# print(b, np.flip(pfsData))
# print(b, Yc[:, b])
pass
elif structure == 'bunton':
# >>> Use WOLA structure as in reconstruct.m by John Bunton
Ncoefs = len(coefs)
Ntaps = ceil_div(Ncoefs, Ndft) # taps, columns
Ndelays = Ntaps * Ndft # zero padded coefs
# Prepend xData with pfs.Ncoefs - Ndown zeros to align with 'pfs'
# implementation
xData = np.concatenate((np.zeros(Ncoefs - Ndown), xData))
# Use coefs as window
wCoefs = coefs
wCoefs = np.flip(coefs)
# Time shift offset, due to roll -1 in fold() ?, to align with LO in
# single channel reference.
tOffset = 1 tOffset = 1
# PFB loop per Ndown input xData samples # PFB loop per Ndown input xData samples
for b in range(Nblocks): for b in range(Nblocks):
...@@ -158,22 +123,56 @@ def analysis_dft_filterbank(x, Ndown, Ndft, coefs, structure, commutator=None, v ...@@ -158,22 +123,56 @@ def analysis_dft_filterbank(x, Ndown, Ndft, coefs, structure, commutator=None, v
tShift = b * Ndown tShift = b * Ndown
# Apply window # Apply window
tRange = np.arange(Ncoefs) + tShift tRange = np.arange(Ncoefs) + tShift
tData = xData[tRange] tData = xData[tRange] # data for whole window
zLine = np.zeros(Ndelays, dtype='float')
zLine[0:Ncoefs] = tData * wCoefs # 1) Variant using PFS
# Shifting in the (old and new) data for the whole window is
# equivalent to shifting in only the new data
if 0:
# load whole window data
pfs.shift_in_data(np.flip(tData))
else:
# shift in new data block
tBlock = tData[Ncoefs - Ndown:]
pfs.shift_in_data(np.flip(tBlock))
zPoly = pfs.polyDelays * pfs.polyCoefs
# Sum Ntaps # Sum Ntaps
sumLine = np.zeros(Ndft, dtype='cfloat') pfsData = np.sum(zPoly, axis=1)
# Flip polyphases 0:Ndft to match time order for DFT input
pfsData = np.flip(pfsData)
# 2) Variant using a delay line like in reconstruct.m
dLine = np.zeros(Ndelays, dtype='float')
# Need to load data at end of delay line to sum the taps correctly
# in case Ncoefs < Ndelays
dLine[Ndelays - Ncoefs : Ndelays] = tData * wCoefs
# Sum Ntaps
sumLine = np.zeros(Ndft, dtype='float')
for t in range(Ntaps): for t in range(Ntaps):
sumLine += zLine[np.arange(Ndft) + t * Ndft] sumLine += dLine[np.arange(Ndft) + t * Ndft]
# Verify that 1) PFS and 2) delay line yield same result
# . The two variants differ in structure, but yield same result.
if not np.allclose(sumLine, pfsData):
exit('ERROR: wrong analysis WOLA')
# Fixed time reference, roll is modulo Ndft # Fixed time reference, roll is modulo Ndft
sumShifted = np.roll(sumLine, tOffset + tShift) pfsShifted = np.roll(pfsData, tOffset + tShift)
# DFT # DFT
Yc[:, b] = np.fft.fft(sumShifted) Yc[:, b] = np.fft.fft(pfsShifted)
elif structure == 'pfs': elif structure == 'pfs':
# >>> Use PFS
# PFS with Ndft polyphases and shift in xBlocks of Ndown samples # PFS with Ndft polyphases and shift in xBlocks of Ndown samples
pfs = PolyPhaseFirFilterStructure(Ndft, coefs) pfs = PolyPhaseFirFilterStructure(Ndft, coefs)
# Prepend x with Ndown - 1 zeros, and represent x in Nblocks of Ndown
# samples
Nzeros = Ndown - 1
xBlocks, xData, Nx, Nblocks = polyphase_data_for_downsampling_whole_x(x, Ndown, Nzeros)
# Prepare output
Yc = np.zeros((Ndft, Nblocks), dtype='cfloat')
# PFB loop per Ndown input xData samples # PFB loop per Ndown input xData samples
for b in range(Nblocks): for b in range(Nblocks):
# Filter block, the inData blocks are already flipped in # Filter block, the inData blocks are already flipped in
...@@ -183,8 +182,8 @@ def analysis_dft_filterbank(x, Ndown, Ndft, coefs, structure, commutator=None, v ...@@ -183,8 +182,8 @@ def analysis_dft_filterbank(x, Ndown, Ndft, coefs, structure, commutator=None, v
# Time (m) and bin (k) dependend phase shift W_K^(kmM) by circular # Time (m) and bin (k) dependend phase shift W_K^(kmM) by circular
# time shift of DFT input # time shift of DFT input
tShift = b * Ndown # roll is modulo Ndft tShift = b * Ndown
pfsShifted = np.roll(pfsData, -tShift) pfsShifted = np.roll(pfsData, -tShift) # roll is modulo Ndft
# For 'ccw' apply IDFT, for 'cw' apply DFT(fold()), due to fold() # For 'ccw' apply IDFT, for 'cw' apply DFT(fold()), due to fold()
# both yield identical result # both yield identical result
...@@ -202,9 +201,9 @@ def analysis_dft_filterbank(x, Ndown, Ndft, coefs, structure, commutator=None, v ...@@ -202,9 +201,9 @@ def analysis_dft_filterbank(x, Ndown, Ndft, coefs, structure, commutator=None, v
exit('ERROR: invalid structure ' + str(structure)) exit('ERROR: invalid structure ' + str(structure))
if verbosity: if verbosity:
Ros = Ndft / Ndown
print('> analysis_dft_filterbank():') print('> analysis_dft_filterbank():')
print(' . len(x) =', str(len(x))) print(' . len(x) =', str(len(x)))
print(' . Nx =', str(Nx))
print(' . Nblocks =', str(Nblocks)) print(' . Nblocks =', str(Nblocks))
print(' . Ros =', str(Ros)) print(' . Ros =', str(Ros))
print(' . Ndown =', str(Ndown)) print(' . Ndown =', str(Ndown))
...@@ -230,8 +229,8 @@ def synthesis_dft_filterbank(Xbase, Nup, Ndft, coefs, structure, commutator=None ...@@ -230,8 +229,8 @@ def synthesis_dft_filterbank(Xbase, Nup, Ndft, coefs, structure, commutator=None
. Ndft: DFT size, number of polyphases in PFS FIR filter . Ndft: DFT size, number of polyphases in PFS FIR filter
. coefs: prototype LPF FIR filter coefficients for anti aliasing and . coefs: prototype LPF FIR filter coefficients for anti aliasing and
interpolating BPF interpolating BPF
. structure: 'wola' or 'pfs' . structure: 'wola'
. commutator: 'ccw' to use DFT or 'cw' to use IDFT . commutator: 'cw' to use DFT or 'ccw' to use IDFT
- verbosity: when > 0 print() status, else no print() - verbosity: when > 0 print() status, else no print()
Return: Return:
. y: Upsampled and upconverted output signal y[n]. . y: Upsampled and upconverted output signal y[n].
...@@ -248,14 +247,24 @@ def synthesis_dft_filterbank(Xbase, Nup, Ndft, coefs, structure, commutator=None ...@@ -248,14 +247,24 @@ def synthesis_dft_filterbank(Xbase, Nup, Ndft, coefs, structure, commutator=None
# Adjust LO phase for group delay of analysis LPF and synthesis LPF in # Adjust LO phase for group delay of analysis LPF and synthesis LPF in
# series. This is not needed in practise, but is needed to be able to # series. This is not needed in practise, but is needed to be able to
# exactly compare reconstructed signal with original input time series # exactly compare reconstructed signal with original input time series
# signal. Assume coefs has group delay (len(coefs) - 1) / 2. # signal. Assume analysis coefs and synthesis coefs have linear phase and
# same length (but not necessarily same coefs), then the total group
# delay is (len(coefs) - 1) / 2 + (len(coefs) - 1) / 2.
hPairGroupDelay = len(coefs) - 1 hPairGroupDelay = len(coefs) - 1
# Oversampling analysis DFT filterbank # Oversampling synthesis DFT filterbank
if structure == 'wola': if structure == 'wola':
# >>> Use WOLA structure # >>> Use WOLA structure [CROCHIERE Fig 7.20]
# Use PFS with Ndft polyphases for delay line and window # Apply the coefs as a window conform [CROCHIERE Eq 7.76].
# Use PFS with Ndft polyphases for delay line and coefs window
pfs = PolyPhaseFirFilterStructure(Ndft, coefs) pfs = PolyPhaseFirFilterStructure(Ndft, coefs)
# Use coefs as window
wCoefs = coefs
# Get parameters
Ncoefs = pfs.Ncoefs
Ntaps = pfs.Ntaps
Ndelays = pfs.Ndelays # zero padded coefs
# PFB loop per Nup output y samples # PFB loop per Nup output y samples
for b in range(Nblocks): for b in range(Nblocks):
...@@ -270,51 +279,37 @@ def synthesis_dft_filterbank(Xbase, Nup, Ndft, coefs, structure, commutator=None ...@@ -270,51 +279,37 @@ def synthesis_dft_filterbank(Xbase, Nup, Ndft, coefs, structure, commutator=None
# Apply filter group delay (hPairGroupDelay), and time (m) and bin # Apply filter group delay (hPairGroupDelay), and time (m) and bin
# (k) dependend phase shift W_K^(kmM) by circular time shift of # (k) dependend phase shift W_K^(kmM) by circular time shift of
# DFT output # DFT output. To get from fixed time reference to sliding time
tShift = b * Nup - hPairGroupDelay # roll is modulo Ndft # reference.
xShifted = np.roll(xTime, -tShift) tShift = b * Nup - hPairGroupDelay
xShifted = np.roll(xTime, -tShift) # roll is modulo Ndft
# Load xTime at each tap in polyDelays # 1) Variant using PFS.
# Load xTime at Ntaps in polyDelays
pfs.parallel_load(xShifted) pfs.parallel_load(xShifted)
# Apply FIR coefs per delay element # Apply FIR coefs per delay element
zPoly = Nup * pfs.polyDelays * pfs.polyCoefs zPoly = Nup * pfs.polyDelays * pfs.polyCoefs
zLine = pfs.map_to_delay_line(zPoly) zLine = pfs.map_to_delay_line(zPoly)
zLine = zLine[0:Ncoefs]
# Overlap add weigthed input to the output # 2) Variant using a delay line like in reconstruct.m
tRange = np.arange(pfs.Ndelays) + b * Nup
y[tRange] += zLine
if b < 3:
# print(b, Xbase[:, b])
# print(b, xTime / Ndft)
# print(b, zLine / Ndft / Nup)
# print(b, y[tRange] / Ndft / Nup)
pass
elif structure == 'bunton':
# >>> Use WOLA structure as in reconstruct.m by John Bunton
Ncoefs = len(coefs)
Ntaps = ceil_div(Ncoefs, Ndft) # taps, columns
Ndelays = Ntaps * Ndft # zero padded coefs
# Use coefs as window
wCoefs = coefs
# wCoefs = np.flip(coefs)
for b in range(Nblocks):
# IDFT
xTime = Ndft * np.real(np.fft.ifft(Xbase[:, b]))
# Sliding time reference
tShift = b * Nup - hPairGroupDelay # roll is modulo Ndft
xShifted = np.roll(xTime, -tShift)
# Copy data at Ntaps # Copy data at Ntaps
zLine = np.zeros(Ndelays, dtype='float') dLine = np.zeros(Ndelays, dtype='float')
for t in range(Ntaps): for t in range(Ntaps):
zLine[np.arange(Ndft) + t * Ndft] = xShifted dLine[np.arange(Ndft) + t * Ndft] = xShifted
# Apply window # Apply window
yLine = np.zeros(Ncoefs, dtype='float') yLine = np.zeros(Ncoefs, dtype='float')
yLine = Nup * zLine[0:Ncoefs] * wCoefs yLine = Nup * dLine[0:Ncoefs] * wCoefs
# Verify that 1) PFS and 2) delay line yield same result
# . Both structures are in fact identical, because both use
# parallel load of the xShifted data.
if not np.allclose(zLine, yLine):
exit('ERROR: wrong sythesis WOLA')
# Overlap add weigthed input to the output # Overlap add weigthed input to the output
tRange = np.arange(Ncoefs) + b * Nup tRange = np.arange(pfs.Ncoefs) + b * Nup
y[tRange] += yLine y[tRange] += zLine
else: else:
# No 'pfs' for fractional oversampled synthesis, because the # No 'pfs' for fractional oversampled synthesis, because the
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment