Skip to content
Snippets Groups Projects
Commit 61d84513 authored by Eric Kooistra's avatar Eric Kooistra
Browse files

Add draft non_maximal_upsample_bpf(). Support up transposed FIR structure in filter_block().

parent ebe4beaa
No related branches found
No related tags found
1 merge request!419Resolve RTSD-265
......@@ -50,6 +50,10 @@ def zeros(shape, cmplx=False):
return np.zeros(shape)
def ones(shape, cmplx=False):
return zeros(shape, cmplx) + 1
def down(x, D, phase=0):
"""Downsample x[n] by factor D, xD[m] = x[m D], m = n // D
......@@ -140,7 +144,7 @@ class PolyPhaseFirFilterStructure:
delayLine = [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9,10,11]
For polyDelays shift in from left-top (n = 15 is newest input sample)
and shift-out from right-bottom (n = 4 is oldest sample).
and shift out from right-bottom (n = 4 is oldest sample).
v v
polyDelays = [[0, 4, 8], ((15,11, 7),
......@@ -204,7 +208,7 @@ class PolyPhaseFirFilterStructure:
def map_to_poly_delays(self, delayLine):
self.polyDelays = delayLine.reshape((self.Ntaps, self.Nphases)).T
def shift_in_data(self, inData, flipped=False):
def shift_in_data(self, inData, flipped=True):
"""Shift block of data into the polyDelays structure.
View polyDelays as delay line if L < Nphases. Shift in from the left
......@@ -234,22 +238,33 @@ class PolyPhaseFirFilterStructure:
self.polyDelays = np.roll(self.polyDelays, 1, axis=1)
self.polyDelays[:, 0] = xData
def filter_block(self, inData, flipped=False):
"""Filter block of inData per polyphase.
def filter_block(self, inData, sampling, flipped=True):
"""Filter block of input data per polyphase for downsampling or for
upsampling.
Input:
. inData: block of input data with length <= Nphases, time index n in
inData[n] increments, so inData[0] is oldest data and inData[-1] is
newest data. The inData block size is = 1 for upsampling or > 1
and <= Nphases for downsampling.
. inData: block of input data for downsampling or single input data for
upsampling.
. sampling: 'down' or 'up'
- 'down': for downsampling inData[n] is a block of length L time
samples, with 1 <= L <= Nphases [HARRIS Fig. 6.12, 6.13]. The time
index n in inData[n] increments, so inData[0] is oldest data and
inData[-1] is newest data when flipped is False.
- 'up': for upsampling inData length L = 1 sample [HARRIS Fig. 7.11,
7.12]. The inData is fanout to the Nphases.
. flipped: False then inData order is inData[n] for downsampling and
still needs to be flipped. If True then the inData is already
flipped. Not used for upsampling.
Return:
. pfsData: block of polyphase FIR filtered output data for Nphase, with
. pfsData: block of polyphase FIR filtered output data for Nphases, with
pfsData[p] and p = 0:Nphases-1 from top to bottom.
. flipped: False then inData order is inData[n] and still needs to be
flipped. If True then the inData is already flipped.
"""
# Shift in one block of input data (1 <= len(inData) <= Nphases)
self.shift_in_data(inData, flipped)
if sampling == 'down':
self.shift_in_data(inData, flipped)
else: # 'up'
inWires = inData * ones(self.Nphases, np.iscomplexobj(inData))
self.shift_in_data(inWires, flipped=True)
# Apply FIR coefs per delay element
zData = self.polyDelays * self.polyCoefs
# Sum FIR taps per polyphase
......@@ -299,7 +314,7 @@ def polyphase_data_for_downsampling_whole_x(x, Ndown, Nzeros):
# . Store data in time order per branch, so with oldest data left, to match
# use with lfilter(). Note this differs from order of tap data in
# pfs.polyDelays where newest data is left, because the block data is
# flipped by shift_in_data(), to match use in pfs.filter_block, so that
# flipped by shift_in_data(), to match use in pfs.filter_block(), so that
# it can use pfs.polyDelays * pfs.polyCoefs to filter the block.
# . The newest sample x[-1] has to be in top branch of polyX and the oldest
# sample x[0] in bottom branch, to match branch order of 0:Ndown-1 from
......@@ -623,37 +638,45 @@ def resample(x, Nup, Ndown, coefs, verify=False, verbosity=1): # interpolate an
###############################################################################
# Single bandpass channel up and down sampling and up and down conversion
# Single bandpass channel up and downsampling and up and downconversion
###############################################################################
def unit_circle_loops_phasor_arr(k, N, sign):
def unit_circle_loops_phasor_arr(k, N, sign=1):
"""Return array of N phasors on k loops along the unit circle.
Polyphase dependent phase offsets for bin k, when N = Ndown = Ndft [HARRIS
Eq 6.8]. For k = 1 this yields the roots of unity.
Input:
. k: bin in range(N)
. N: number of phasors
. sign: +1 or -1 term in exp()
Return:
. pArr: exp(+-j 2pi k / N) for k in 0 : N - 1
. pArr: exp(sign 2j pi k / N) for k in 0 : N - 1
"""
if sign == 'positive':
pArr = np.array([np.exp(2j * np.pi * p * k / N) for p in range(N)])
else: # 'negative'
pArr = np.array([np.exp(-2j * np.pi * p * k / N) for p in range(N)])
pArr = np.array([np.exp(sign * 2j * np.pi * p * k / N) for p in range(N)])
return pArr
def time_shift_phasor_arr(k, Ndown, Ndft, Msamples):
def time_shift_phasor_arr(k, M, Ndft, Msamples, sign):
"""Return array of Msamples phasors in time to compensate for oversampling
time shift.
The time shift due to downsampling causes a frequency component of
k * Ndown / Ndft. With oversampling Ndown < Ndft, and then after
downsampling there remains a frequency offset [HARRIS Eq 9.3].
The time shift due to down or upsampling causes a frequency component of
k * M / Ndft. With oversampling M < Ndft, and then after down or upsampling
there remains a frequency offset that can be compensated by given by mArr
[HARRIS Eq 9.3].
Input:
. k: Index of BPF center frequency w_k = 2 pi k / Ndft
. M: downsample or upsample factor
. Ndft: DFT size, number of bins and number of polyphases in PFS FIR filter
. Msamples: Requested number of output samples in time
. sign: +1 or -1 term in exp()
Return:
. mArr = exp(2j pi k * Ndown / Ndft * m), for m in 0 : Msamples - 1
. mArr = exp(sign 2j pi k * M / Ndft * m), for m in 0 : Msamples - 1
"""
mArr = np.exp(-2j * np.pi * k * Ndown / Ndft * np.arange(Msamples))
mArr = np.exp(sign * 2j * np.pi * k * M / Ndft * np.arange(Msamples))
return mArr
......@@ -661,14 +684,15 @@ def maximal_downsample_bpf(x, Ndown, k, coefs, verbosity=1):
"""BPF x at bin k in range(Ndown) and downsample x by factor D = Ndown and
Ndown = Ndft.
Implement maximal downsampling down converter for one bin (= critically
Implement maximal downsampling downconverter for one bin (= critically
sampled) [HARRIS Fig 6.14].
The BPF is centered at w_k = 2pi k / Ndft, where Ndft is number of
frequency bins, is DFT size. The downsampling is maximal so Ndown = Ndft.
The polyphase structure has Nphases = Ndown branches, so the input x
data that shifts in remains in each branch. Therefore each branch can be
FIR filtered independently for the whole input x using polyX.
FIR filtered independently for the whole input x using
polyphase_frontend().
Input:
. x: Input signal x[n]
......@@ -677,7 +701,7 @@ def maximal_downsample_bpf(x, Ndown, k, coefs, verbosity=1):
. coefs: prototype FIR filter coefficients for anti aliasing BPF
- verbosity: when > 0 print() status, else no print()
Return:
. yc: Downsampled and down converted output signal yc[m], m = n // D for
. yc: Downsampled and downconverted output signal yc[m], m = n // D for
bin k. Complex baseband signal.
"""
# Polyphase FIR filter input x
......@@ -685,14 +709,14 @@ def maximal_downsample_bpf(x, Ndown, k, coefs, verbosity=1):
# Phase rotate per polyphase for bin k, due to delay line at branch inputs
# [HARRIS Eq 6.8]
kPhasors = unit_circle_loops_phasor_arr(k, Ndown, 'positive')
polyYC = np.zeros((Ndown, Nxp), dtype='cfloat')
kPhasors = unit_circle_loops_phasor_arr(k, Ndown)
polyYc = np.zeros((Ndown, Nxp), dtype='cfloat')
for p in range(Ndown):
polyYC[p] = polyY[p] * kPhasors[p] # row = row * scalar
polyYc[p] = polyY[p] * kPhasors[p] # row = row * scalar
# Sum the branch outputs to get single downsampled and downconverted output
# complex baseband value yc.
yc = np.sum(polyYC, axis=0)
yc = np.sum(polyYc, axis=0)
if verbosity:
print('> Log maximal_downsample_bpf():')
......@@ -706,60 +730,65 @@ def maximal_downsample_bpf(x, Ndown, k, coefs, verbosity=1):
return yc
def maximal_upsample_bpf(x, Nup, k, coefs, verbosity=1):
"""BPF x at bin k in range(Ndft) and downsample x by factor D = Nup
= Ndft.
def maximal_upsample_bpf(xBase, Nup, k, coefs, verbosity=1):
"""BPF xBase at bin k in range(Ndft), and upsample xBase by factor U = Nup =
Ndft.
Implement maximal upsampling upconverter for one bin (= critically
sampled) [HARRIS Fig. 7.16].
The BPF is centered at w_k = 2pi k / Ndft, where Ndft is number of
frequency bins, is DFT size. The upsampling is maximal so Nup = Ndft.
The polyphase structure has Nphases = Nup branches, so the input x
data that shifts in remains in each branch. Therefore each branch can be
FIR filtered independently for the whole input x using polyX.
frequency bins, is DFT size. The upsampling is maximal so Nup = Ndft. The
polyphase structure has Nphases = Nup branches, so the output yc data
shifts out from the same branch for each block of Nup samples. Therefore
each branch can be FIR filtered independently for the whole input xBase
using polyphase_frontend().
Input:
. x: Input signal x[n]
. xBase: Input equivalent baseband signal
. Nup: upsample factor
. k: Index of BPF center frequency w_k = 2 pi k / Nup
. coefs: prototype FIR filter coefficients for anti aliasing and
interpolating BPF
- verbosity: when > 0 print() status, else no print()
Return:
. yc: Upsampled and upconverted output signal yc[n], n = m D for
bin k. Complex signal, so positive frequencies only.
. yc: Upsampled and upconverted output signal yc[n], n = m U at
intermediate frequency (IF) of bin k. Complex positive frequencies
only signal.
"""
# Polyphase FIR filter input x
polyY, Nx, Nxp = polyphase_frontend(x, Nup, coefs, 'up')
# Polyphase FIR filter input xBase
polyY, Nx, Nxp = polyphase_frontend(xBase, Nup, coefs, 'up')
# Phase rotate per polyphase for bin k, due to delay line at branch inputs
# [HARRIS Eq 7.8 = 6.8]
kPhasors = unit_circle_loops_phasor_arr(k, Nup, 'positive')
polyYC = np.zeros((Nup, Nxp), dtype='cfloat')
kPhasors = unit_circle_loops_phasor_arr(k, Nup)
polyYc = np.zeros((Nup, Nxp), dtype='cfloat')
for p in range(Nup):
polyYC[p] = polyY[p] * kPhasors[p] # row = row * scalar
polyYc[p] = polyY[p] * kPhasors[p] # row = row * scalar
# Output Nup samples yc[m] for every input sample x[n]
yc = polyYC.T.reshape(1, Nup * Nx)[0]
# Output Nup samples yc[m] for every input sample xBase[n]
yc = polyYc.T.reshape(1, Nup * Nx)[0]
if verbosity:
print('> Log maximal_downsample_bpf():')
print(' . len(x) =', str(len(x)))
print(' . Nx =', str(Nx))
print(' . Nxp =', str(Nxp))
print(' . len(yc) =', str(len(yc))) # = Nxp
print(' . Nup =', str(Nup))
print(' . k =', str(k))
print(' . len(xBase) =', str(len(xBase)))
print(' . Nx =', str(Nx))
print(' . Nxp =', str(Nxp))
print(' . len(yc) =', str(len(yc))) # = Nxp
print(' . Nup =', str(Nup))
print(' . k =', str(k))
print('')
return yc
def non_maximal_downsample_bpf(x, Ndown, k, Ndft, coefs, verbosity=1):
"""BPF x at bin k in range(Ndown) and downsample x by factor D = Ndown
"""BPF x at bin k in range(Ndown), and downsample x by factor D = Ndown.
Implement nonmaximal downsampling down converter for one bin, extend
[HARRIS Fig 6.14].
Implement nonmaximal downsampling downconverter for one bin. The maximal
downsampling downconverter for one bin has the kPhasors per polyphase
branch of [HARRIS Eq. 6.8 and Fig 6.14]. For nonmaximal this needs to be
extended with the tPhasors per polyphase branch of [HARRIS Eq. 9.3, to
compensate for the oversampling time shift.
The BPF is centered at w_k = 2pi k / Ndft, where Ndft is number of
frequency bins, is DFT size. The polyphase FIR structure has Nphases = Ndft
......@@ -779,7 +808,7 @@ def non_maximal_downsample_bpf(x, Ndown, k, Ndft, coefs, verbosity=1):
. coefs: prototype LPF FIR filter coefficients for anti aliasing BPF
- verbosity: when > 0 print() status, else no print()
Return:
. yc: Downsampled and down converted output signal yc[m], m = n // D for
. yc: Downsampled and downconverted output signal yc[m], m = n // D for
bin k. Complex baseband signal.
"""
# Prepend x with Ndown - 1 zeros, and represent x in Nblocks of Ndown
......@@ -793,15 +822,15 @@ def non_maximal_downsample_bpf(x, Ndown, k, Ndft, coefs, verbosity=1):
# PFS with Ndft polyphases
pfs = PolyPhaseFirFilterStructure(Ndft, coefs)
kPhasors = unit_circle_loops_phasor_arr(k, Ndft, 'positive') # [HARRIS Eq 6.8]
kPhasors = unit_circle_loops_phasor_arr(k, Ndft) # [HARRIS Eq 6.8]
# Oversampling time shift compensation via frequency dependent phase shift
tPhasors = time_shift_phasor_arr(k, Ndown, Ndft, Nblocks) # [HARRIS Eq 9.3]
tPhasors = time_shift_phasor_arr(k, Ndown, Ndft, Nblocks, -1) # [HARRIS Eq 9.3]
for b in range(Nblocks):
# Filter block
inData = xBlocks[:, b]
pfsData = pfs.filter_block(inData, flipped=True)
pfsData = pfs.filter_block(inData, 'down', flipped=True)
# Phase rotate polyphases for bin k
pfsBinData = pfsData * kPhasors * tPhasors[b] # [HARRIS Eq 6.8, 9.3]
# Sum the polyphases to get single downsampled and downconverted output
......@@ -819,3 +848,79 @@ def non_maximal_downsample_bpf(x, Ndown, k, Ndft, coefs, verbosity=1):
print(' . k =', str(k))
print('')
return yc
def non_maximal_upsample_bpf(xBase, Nup, k, Ndft, coefs, verbosity=1):
"""BPF xBase at bin k in range(Nup), and upsample xBase by factor U = Nup.
Implement nonmaximal upsampling upconverter for one bin. The maximal
upsampling upconverter for one bin has the kPhasors per polyphase
branch similar of [HARRIS Fig 7.16]. For nonmaximal this needs to be
extended with the tPhasors per polyphase branch similar as for down in
[HARRIS Eq. 9.3], to compensate for the oversampling time shift.
The BPF is centered at w_k = 2pi k / Ndft, where Ndft is number of
frequency bins, is DFT size. The polyphase FIR structure has Nphases = Ndft
branches, to fit the requested number of bins. The polyphase FIR structure
is maximally upsampled (= critically sampled) for Nup = Ndft, but it
can support any Nup <= Ndft. The output data shifts out per Nup samples,
so it appears from different branches when Nup < Ndft and a new block is
shifted out. Therefore the output data cannot be FIR filtered per branch
for the whole input xBase. Instead it needs to be FIR filtered per block of
Nup output samples, using pfs.polyDelays in pfs.filter_block().
Input:
. xBase: Input equivalent baseband signal xBase[m]
. Nup: upsample factor
. k: Index of BPF center frequency w_k = 2 pi k / Ndft
. Ndft: DFT size, number of polyphases in PFS FIR filter
. coefs: prototype LPF FIR filter coefficients for anti aliasing and
interpolationBPF
- verbosity: when > 0 print() status, else no print()
Return:
. yc: Upsampled and upconverted output signal yc[n], n = m U at
intermediate frequency (IF) of bin k. Complex positive frequencies
only signal.
"""
Nblocks = len(xBase)
# Prepare output
polyYc = np.zeros((Nup, Nblocks), dtype='cfloat')
# PFS with Ndft polyphases
pfs = PolyPhaseFirFilterStructure(Ndft, coefs, cmplx=True)
kPhasors = unit_circle_loops_phasor_arr(k, Ndft) # [HARRIS Eq 7.8]
# Oversampling time shift compensation via frequency dependent phase shift
tPhasors = time_shift_phasor_arr(k, Nup, Ndft, Nblocks, -1) # [HARRIS Eq 9.3]
# Polyphase FIR filter input xBase
nLo = 0
for b in range(Nblocks):
# Filter block
inData = xBase[b]
pfsData = Nup * pfs.filter_block(inData, 'up')
# Phase rotate polyphases for bin k
pfsBinData = pfsData * kPhasors * tPhasors[b] # [HARRIS Eq 7.8, 9.3]
# Output Nup samples yc[n] for every input sample xBase[m]
nEnd = nLo + Nup
if nEnd <= Ndft:
outBinData = pfsBinData[nLo : nEnd]
else:
outBinData = np.concatenate((pfsBinData[nLo : Ndft],
pfsBinData[0 : nEnd - Ndft]))
nLo = nEnd % Ndft
polyYc[:, b] = outBinData
yc = polyYc.T.reshape(1, Nup * Nblocks)[0]
if verbosity:
print('> non_maximal_upsample_bpf():')
print(' . len(xBase) =', str(len(xBase)))
print(' . Nblocks =', str(Nblocks))
print(' . len(yc) =', str(len(yc))) # = Nblocks
print(' . Nup =', str(Nup))
print(' . Ndft =', str(Ndft))
print(' . k =', str(k))
print('')
return yc
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment