Skip to content
Snippets Groups Projects
Commit 98a70602 authored by Eric Kooistra's avatar Eric Kooistra
Browse files

Add maximal_upsample_bpf() and zeros().

parent c3a6d483
No related branches found
No related tags found
1 merge request!419Resolve RTSD-265
...@@ -43,6 +43,13 @@ from .utilities import c_rtol, c_atol, ceil_div ...@@ -43,6 +43,13 @@ from .utilities import c_rtol, c_atol, ceil_div
c_firA = [1.0] # FIR b = coefs, a = 1 c_firA = [1.0] # FIR b = coefs, a = 1
def zeros(shape, cmplx=False):
if cmplx:
return np.zeros(shape, dtype='cfloat')
else:
return np.zeros(shape)
def down(x, D, phase=0): def down(x, D, phase=0):
"""Downsample x[n] by factor D, xD[m] = x[m D], m = n // D """Downsample x[n] by factor D, xD[m] = x[m D], m = n // D
...@@ -57,7 +64,7 @@ def up(x, U): ...@@ -57,7 +64,7 @@ def up(x, U):
After every sample in x insert U-1 zeros. After every sample in x insert U-1 zeros.
""" """
xU = np.zeros(len(x) * U) xU = zeros(len(x) * U, np.iscomplexobj(x))
xU[0::U] = x xU[0::U] = x
return xU return xU
...@@ -75,6 +82,9 @@ class PolyPhaseFirFilterStructure: ...@@ -75,6 +82,9 @@ class PolyPhaseFirFilterStructure:
using the FIR filter. The spectral aliasing and spectral replication are using the FIR filter. The spectral aliasing and spectral replication are
due to the sample rate change, not to the implementation structure. due to the sample rate change, not to the implementation structure.
The FIR data can be real to process real data, or complex to process
equivalent baseband data. The FIR coefficients are real.
The PFS is is suitable for downsampling and for upsampling: The PFS is is suitable for downsampling and for upsampling:
- Upsampling uses the PFS as Transposed Direct-Form FIR filter, where the - Upsampling uses the PFS as Transposed Direct-Form FIR filter, where the
commutator selects the polyphases from phase 0 to Nphases - 1 to yield commutator selects the polyphases from phase 0 to Nphases - 1 to yield
...@@ -89,9 +99,11 @@ class PolyPhaseFirFilterStructure: ...@@ -89,9 +99,11 @@ class PolyPhaseFirFilterStructure:
VAIDYANATHAN 4.3]. VAIDYANATHAN 4.3].
Input: Input:
. coefs: FIR coefficients b[0 : Nphases * Ntaps - 1]. . coefs: Real FIR coefficients b[0 : Nphases * Ntaps - 1].
. Nphases: number of polyphases, is number of rows (axis=0) in the . Nphases: number of polyphases, is number of rows (axis=0) in the
polyphase structure. polyphase structure.
. cmplx: Define PFS delays for real (when False) or complex (when True)
FIR data.
Derived: Derived:
. Ntaps: number of taps per polyphase, is number of columns (axis=1) in . Ntaps: number of taps per polyphase, is number of columns (axis=1) in
the polyphase structure. the polyphase structure.
...@@ -148,11 +160,12 @@ class PolyPhaseFirFilterStructure: ...@@ -148,11 +160,12 @@ class PolyPhaseFirFilterStructure:
. map_to_delay_line() . map_to_delay_line()
. map_to_poly_delays() . map_to_poly_delays()
""" """
def __init__(self, Nphases, coefs): def __init__(self, Nphases, coefs, cmplx=False):
self.coefs = coefs self.coefs = coefs
self.Ncoefs = len(coefs) self.Ncoefs = len(coefs)
self.Nphases = Nphases # branches, rows self.Nphases = Nphases # branches, rows
self.Ntaps = ceil_div(self.Ncoefs, Nphases) # taps, columns self.Ntaps = ceil_div(self.Ncoefs, Nphases) # taps, columns
self.cmplx = cmplx
self.init_poly_coeffs() self.init_poly_coeffs()
self.reset_poly_delays() self.reset_poly_delays()
...@@ -177,12 +190,12 @@ class PolyPhaseFirFilterStructure: ...@@ -177,12 +190,12 @@ class PolyPhaseFirFilterStructure:
Nphases = 4 rows (axis=0) Nphases = 4 rows (axis=0)
Ntaps = 3 columns (axis=1) Ntaps = 3 columns (axis=1)
""" """
polyCoefs = np.zeros(self.Nphases * self.Ntaps) polyCoefs = np.zeros(self.Nphases * self.Ntaps) # real
polyCoefs[0 : self.Ncoefs] = self.coefs polyCoefs[0 : self.Ncoefs] = self.coefs
self.polyCoefs = polyCoefs.reshape((self.Ntaps, self.Nphases)).T self.polyCoefs = polyCoefs.reshape((self.Ntaps, self.Nphases)).T
def reset_poly_delays(self): def reset_poly_delays(self):
self.polyDelays = np.zeros((self.Nphases, self.Ntaps)) self.polyDelays = zeros((self.Nphases, self.Ntaps), self.cmplx)
def map_to_delay_line(self): def map_to_delay_line(self):
delayLine = self.polyDelays.T.reshape(-1) delayLine = self.polyDelays.T.reshape(-1)
...@@ -280,7 +293,7 @@ def polyphase_data_for_downsampling_whole_x(x, Ndown, Nzeros): ...@@ -280,7 +293,7 @@ def polyphase_data_for_downsampling_whole_x(x, Ndown, Nzeros):
# . prepend x with Ndown - 1 zeros # . prepend x with Ndown - 1 zeros
# . skip any last remaining samples from x, that are not enough yield a new # . skip any last remaining samples from x, that are not enough yield a new
# output FIR sum. # output FIR sum.
polyX = np.zeros(Ndown * Nxp) polyX = zeros(Ndown * Nxp, np.iscomplexobj(x))
polyX[Ndown - 1] = x[0] polyX[Ndown - 1] = x[0]
polyX[Ndown:] = x[1 : Nx] polyX[Ndown:] = x[1 : Nx]
# . Store data in time order per branch, so with oldest data left, to match # . Store data in time order per branch, so with oldest data left, to match
...@@ -333,7 +346,7 @@ def polyphase_frontend(x, Nphases, coefs, sampling): ...@@ -333,7 +346,7 @@ def polyphase_frontend(x, Nphases, coefs, sampling):
polyphase. polyphase.
""" """
# Use polyphase FIR filter coefficients from pfs class. # Use polyphase FIR filter coefficients from pfs class.
pfs = PolyPhaseFirFilterStructure(Nphases, coefs) pfs = PolyPhaseFirFilterStructure(Nphases, coefs, np.iscomplexobj(x))
# Define polyphases for whole data signal x with Nx samples, instead of # Define polyphases for whole data signal x with Nx samples, instead of
# using polyDelays per Ntaps from pfs class, to be able to use # using polyDelays per Ntaps from pfs class, to be able to use
...@@ -346,10 +359,11 @@ def polyphase_frontend(x, Nphases, coefs, sampling): ...@@ -346,10 +359,11 @@ def polyphase_frontend(x, Nphases, coefs, sampling):
# each use all x to yield Nup output samples per input sample from x. # each use all x to yield Nup output samples per input sample from x.
# The commutator index order for upsampling is p = 0, 1, .., Nup - 1, # The commutator index order for upsampling is p = 0, 1, .., Nup - 1,
# so from top to bottom in the PFS. # so from top to bottom in the PFS.
polyY = np.zeros((Nup, Nx)) polyY = zeros((Nup, Nx), np.iscomplexobj(x))
pCommutator = range(Nup) pCommutator = range(Nup)
for p in pCommutator: for p in pCommutator:
polyY[p] = signal.lfilter(pfs.polyCoefs[p], c_firA, x) polyY[p] = Nup * signal.lfilter(pfs.polyCoefs[p], c_firA, x)
else: else:
# 'down': # 'down':
# . Prepend x with Ndown - 1 zeros, to have y[m] for m = 0 start at # . Prepend x with Ndown - 1 zeros, to have y[m] for m = 0 start at
...@@ -366,7 +380,7 @@ def polyphase_frontend(x, Nphases, coefs, sampling): ...@@ -366,7 +380,7 @@ def polyphase_frontend(x, Nphases, coefs, sampling):
# commutator index order is only relevant when the branches are # commutator index order is only relevant when the branches are
# calculated sequentially to reuse the same hardware, because for the # calculated sequentially to reuse the same hardware, because for the
# output y the branches are summed anyway. # output y the branches are summed anyway.
polyY = np.zeros((Ndown, Nxp)) polyY = zeros((Ndown, Nxp), np.iscomplexobj(x))
pCommutator = np.flip(np.arange(Ndown)) pCommutator = np.flip(np.arange(Ndown))
for p in pCommutator: for p in pCommutator:
polyY[p] = signal.lfilter(pfs.polyCoefs[p], c_firA, polyX[p]) polyY[p] = signal.lfilter(pfs.polyCoefs[p], c_firA, polyX[p])
...@@ -418,10 +432,10 @@ def upsample(x, Nup, coefs, verify=False, verbosity=1): # interpolate ...@@ -418,10 +432,10 @@ def upsample(x, Nup, coefs, verify=False, verbosity=1): # interpolate
# Inefficient upsampling implementation with multiply by zero values. # Inefficient upsampling implementation with multiply by zero values.
# Verify that x --> U --> LPF --> y yields identical result y as with # Verify that x --> U --> LPF --> y yields identical result y as with
# using the PFS: x --> PFS FIR --> y. # using the PFS: x --> PFS FIR --> y.
xZeros = np.zeros((Nup, Nx)) xZeros = zeros((Nup, Nx), np.iscomplexobj(x))
xZeros[0] = x xZeros[0] = x
xZeros = xZeros.T.reshape(1, Nup * Nx)[0] # upsample xZeros = xZeros.T.reshape(1, Nup * Nx)[0] # upsample
yVerify = signal.lfilter(coefs, c_firA, xZeros) # LPF yVerify = Nup * signal.lfilter(coefs, c_firA, xZeros) # LPF
print('> Verify upsample():') print('> Verify upsample():')
if np.allclose(y, yVerify, rtol=c_rtol, atol=c_atol): if np.allclose(y, yVerify, rtol=c_rtol, atol=c_atol):
print(' . PASSED: correct upsample result') print(' . PASSED: correct upsample result')
...@@ -481,7 +495,7 @@ def downsample(x, Ndown, coefs, verify=False, verbosity=1): # decimate ...@@ -481,7 +495,7 @@ def downsample(x, Ndown, coefs, verify=False, verbosity=1): # decimate
# Inefficient downsampling implementation with calculating values that # Inefficient downsampling implementation with calculating values that
# are removed. Verify that x --> LPF --> D --> y yields identical # are removed. Verify that x --> LPF --> D --> y yields identical
# result y as with using the PFS: x --> PFS FIR --> y. # result y as with using the PFS: x --> PFS FIR --> y.
yVerify = np.zeros(Ndown * Nxp) yVerify = zeros(Ndown * Nxp, np.iscomplexobj(x))
yVerify[0 : Nx] = signal.lfilter(coefs, c_firA, x[0 : Nx]) # LPF yVerify[0 : Nx] = signal.lfilter(coefs, c_firA, x[0 : Nx]) # LPF
yVerify = yVerify.reshape(Nxp, Ndown).T # polyphases yVerify = yVerify.reshape(Nxp, Ndown).T # polyphases
yVerify = yVerify[0] # downsample by D yVerify = yVerify[0] # downsample by D
...@@ -584,7 +598,7 @@ def resample(x, Nup, Ndown, coefs, verify=False, verbosity=1): # interpolate an ...@@ -584,7 +598,7 @@ def resample(x, Nup, Ndown, coefs, verify=False, verbosity=1): # interpolate an
v[0] = x v[0] = x
v = v.T.reshape(1, Nup * Nx)[0] # upsample v = v.T.reshape(1, Nup * Nx)[0] # upsample
# . LPF # . LPF
w = signal.lfilter(coefs, c_firA, v) w = Nup * signal.lfilter(coefs, c_firA, v)
# . Downsampling with calculating values that are removed # . Downsampling with calculating values that are removed
yVerify = np.zeros(Ndown * Nyp) yVerify = np.zeros(Ndown * Nyp)
yVerify[0 : Ny] = w[0 : Ny] yVerify[0 : Ny] = w[0 : Ny]
...@@ -644,7 +658,8 @@ def time_shift_phasor_arr(k, Ndown, Ndft, Msamples): ...@@ -644,7 +658,8 @@ def time_shift_phasor_arr(k, Ndown, Ndft, Msamples):
def maximal_downsample_bpf(x, Ndown, k, coefs, verbosity=1): def maximal_downsample_bpf(x, Ndown, k, coefs, verbosity=1):
"""BPF x at bin k in range(Ndown) and downsample x by factor D = Ndown. """BPF x at bin k in range(Ndown) and downsample x by factor D = Ndown and
Ndown = Ndft.
Implement maximal downsampling down converter for one bin (= critically Implement maximal downsampling down converter for one bin (= critically
sampled) [HARRIS Fig 6.14]. sampled) [HARRIS Fig 6.14].
...@@ -670,8 +685,8 @@ def maximal_downsample_bpf(x, Ndown, k, coefs, verbosity=1): ...@@ -670,8 +685,8 @@ def maximal_downsample_bpf(x, Ndown, k, coefs, verbosity=1):
# Phase rotate per polyphase for bin k, due to delay line at branch inputs # Phase rotate per polyphase for bin k, due to delay line at branch inputs
# [HARRIS Eq 6.8] # [HARRIS Eq 6.8]
polyYC = np.zeros((Ndown, Nxp), dtype='cfloat')
kPhasors = unit_circle_loops_phasor_arr(k, Ndown, 'positive') kPhasors = unit_circle_loops_phasor_arr(k, Ndown, 'positive')
polyYC = np.zeros((Ndown, Nxp), dtype='cfloat')
for p in range(Ndown): for p in range(Ndown):
polyYC[p] = polyY[p] * kPhasors[p] # row = row * scalar polyYC[p] = polyY[p] * kPhasors[p] # row = row * scalar
...@@ -691,6 +706,55 @@ def maximal_downsample_bpf(x, Ndown, k, coefs, verbosity=1): ...@@ -691,6 +706,55 @@ def maximal_downsample_bpf(x, Ndown, k, coefs, verbosity=1):
return yc return yc
def maximal_upsample_bpf(x, Nup, k, coefs, verbosity=1):
"""BPF x at bin k in range(Ndft) and downsample x by factor D = Nup
= Ndft.
Implement maximal upsampling upconverter for one bin (= critically
sampled) [HARRIS Fig. 7.16].
The BPF is centered at w_k = 2pi k / Ndft, where Ndft is number of
frequency bins, is DFT size. The upsampling is maximal so Nup = Ndft.
The polyphase structure has Nphases = Nup branches, so the input x
data that shifts in remains in each branch. Therefore each branch can be
FIR filtered independently for the whole input x using polyX.
Input:
. x: Input signal x[n]
. Nup: upsample factor
. k: Index of BPF center frequency w_k = 2 pi k / Nup
. coefs: prototype FIR filter coefficients for anti aliasing and
interpolating BPF
- verbosity: when > 0 print() status, else no print()
Return:
. yc: Upsampled and upconverted output signal yc[n], n = m D for
bin k. Complex signal, so positive frequencies only.
"""
# Polyphase FIR filter input x
polyY, Nx, Nxp = polyphase_frontend(x, Nup, coefs, 'up')
# Phase rotate per polyphase for bin k, due to delay line at branch inputs
# [HARRIS Eq 7.8 = 6.8]
kPhasors = unit_circle_loops_phasor_arr(k, Nup, 'positive')
polyYC = np.zeros((Nup, Nxp), dtype='cfloat')
for p in range(Nup):
polyYC[p] = polyY[p] * kPhasors[p] # row = row * scalar
# Output Nup samples yc[m] for every input sample x[n]
yc = polyYC.T.reshape(1, Nup * Nx)[0]
if verbosity:
print('> Log maximal_downsample_bpf():')
print(' . len(x) =', str(len(x)))
print(' . Nx =', str(Nx))
print(' . Nxp =', str(Nxp))
print(' . len(yc) =', str(len(yc))) # = Nxp
print(' . Nup =', str(Nup))
print(' . k =', str(k))
print('')
return yc
def non_maximal_downsample_bpf(x, Ndown, k, Ndft, coefs, verbosity=1): def non_maximal_downsample_bpf(x, Ndown, k, Ndft, coefs, verbosity=1):
"""BPF x at bin k in range(Ndown) and downsample x by factor D = Ndown """BPF x at bin k in range(Ndown) and downsample x by factor D = Ndown
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment