Skip to content
Snippets Groups Projects

Resolve RTSD-265

Merged Eric Kooistra requested to merge RTSD-265 into master
1 file
+ 120
9
Compare changes
  • Side-by-side
  • Inline
@@ -102,14 +102,54 @@ class PolyPhaseFirFilterStructure:
# newest sample [15]
# samples [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
# blocks [ 0, 1, 2, 3]
# shift blocks to next column in polyDelays
# put newest block[3] in first column of polyDelays
#
# Shift blocks to next column in polyDelays, put newest
# block[3] in first column of polyDelays:
#
# shift out, [0] is oldest input sample
# ^
# polyDelays = [[ 8, 4, 0],
# [ 9, 5, 1],
# [10, 6, 2],
# [11, 7, 3]])
# ^
# shift in, [11] is newest input sample
#
# delayLine = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] <-- shift in
#
# Store as polyDelays structure, use mapping to access as delayLine:
# . map_to_delay_line()
# . map_to_poly_delays()
self.polyDelays = np.zeros((Nphases, self.Ntaps))
def map_to_delay_line(self):
delayLine = np.flip(self.polyDelays.T, axis=0).reshape(-1)
return delayLine
def map_to_poly_delays(self, delayLine):
self.polyDelays = np.flip(delayLine.reshape((self.Ntaps, self.Nphases)).T, axis=1)
def shift_in_data(self, inData):
"""Shift block of data into polyDelays structure.
View polyDelays as delay line if L < Nphases. If L = Nphases, then it
is possible to shift the data directly into the first column of
polyDelays.
"""
L = len(inData)
if L < self.Nphases:
delayLine = self.map_to_delay_line()
# Equivalent code:
# delayLine = np.concatenate((delayLine[L:], inData))
delayLine = np.roll(delayLine, -L)
delayLine[-L:] = inData
self.map_to_poly_delays(delayLine)
else:
# Equivalent code for L == Nphases: Shift in inData block directly
# at column 0
self.polyDelays = np.roll(self.polyDelays, 1, axis=1)
self.polyDelays[:, 0] = inData
def poly_coeffs(self, commutator):
"""Polyphase structure for FIR filter coefficients coefs in Nphases
@@ -156,21 +196,22 @@ class PolyPhaseFirFilterStructure:
"""Filter block of inData per poly phase
Input:
. inData: block of Nphases input data, time index n increments, so
inData[0] is oldest data and inData[Nphases-1] is newest data.
. inData: block of input data with length <= Nphases, time index n increments,
so inData[0] is oldest data and inData[-1] is newest data. The inData
block size is >= 1 (is full rate) and <= Nphases (for maximally down
converted)
Return:
. outData: block of Nphases filtered output data with outData[0] is
oldest data and outData[Nphases-1] is newest data, so time index n
increments, like with inData.
"""
# Shift delay memory by one block
self.polyDelays = np.roll(self.polyDelays, 1, axis=1)
# Shift in inData block at column 0
self.polyDelays[:, 0] = inData
# Shift in one block of input data (1 <= len(inData) <= Nphases)
self.shift_in_data(inData)
# Apply FIR coefs per element
zData = self.polyDelays * self.polyCoefs
# Sum FIR taps per poly phase
outData = np.sum(zData, axis=1)
# Output block of Nphases filtered output data
return outData
@@ -468,7 +509,10 @@ def maximal_downsample_bpf(x, Ndown, k, coefs, verbosity=1):
Implement maximal downsampling down converter for one bin [HARRIS Fig 6.14].
The downsampling is maximal so Ndown = Ndft is number of frequency bins, is DFT size.
The BPF is centered at w_k = 2pi k / Ndft, where Ndft is number of frequency bins, is DFT size. The downsampling
is maximal so Ndown = Ndft. The polyphase structure has Nphases = Ndown branches so the input x data that shifts
in remains in each branch. Therefore each branch can be FIR filtered independently for the whole input x using
polyX.
. see downsample()
@@ -525,6 +569,73 @@ def maximal_downsample_bpf(x, Ndown, k, coefs, verbosity=1):
return y
def nonmaximal_downsample_bpf(x, Ndown, k, Ndft, coefs, verbosity=1):
"""BPF x at bin k in range(Ndown) and downsample x by factor D = Ndown [HARRIS Fig 6.14]
Implement nonmaximal downsampling down converter for one bin, extend [HARRIS Fig 6.14].
The BPF is centered at w_k = 2pi k / Ndft, where Ndft is number of frequency bins, is DFT size. The polyphase
FIR structure has Nphases = Ndft branches, to support any Ndown <= Ndft. The input data shifts in per Ndown
samples, so it appears in different branches when Ndown < Ndft. Therefore the input data cannot be FIR filtered
per branch for the whole input x. Instead it needs to be FIR filtered per block of Ndown input samples from x,
using pfs.polyDelays in pfs.filter_block().
. see downsample()
Input:
. x: Input signal x[n]
. Ndown: downsample factor
. k: Index of BPF center frequency w_k = 2 pi k / Ndft
. Ndft: DFT size, number of polyphases in FIR structure
. coefs: prototype FIR filter coefficients for anti aliasing BPF
- verbosity: when > 0 print() status, else no print()
Return:
. y: Downsampled and down converted output signal y[m], m = n // D for bin
k. Complex baseband signal.
"""
a = [1.0] # FIR b = coefs, a = 1
# Polyphase implementation to avoid calculating values that are removed
# coefs = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
# polyCoefs = [[ 3, 7, 11],
# [ 2, 6, 10],
# [ 1, 5, 9],
# [ 0, 4, 8]])
pfs = PolyPhaseFirFilterStructure(Ndft, coefs, commutator='down')
polyCoefs = pfs.polyCoefs
# Poly phases for whole data signal x, prepended with Ndown - 1 zeros
polyX, Nx, Nxp = poly_structure_data_for_downsampling_whole_x(x, Ndown) # size (Ndown, Nxp), Ny = Nxp
# Filter x per polyphase, order in polyCoefs accounts for commutator [HARRIS Fig 6.12, 6.13]
polyY = np.zeros((Ndown, Nxp))
for p in range(Ndown):
polyY[p] = signal.lfilter(polyCoefs[p], a, polyX[p])
# Phase rotate per polyphase, due to delay line at branch inputs [HARRIS Eq 6.8]
# . polyY can use index p, because order in polyY accounts for commutator,
# . phase rotator needs to use pCommutator to account for commutator, to fit
# order in polyY and polyCoefs
polyYC = np.zeros((Ndown, Nxp), dtype='cfloat')
for p in range(Ndown):
pCommutator = Ndown - 1 - p
polyYC[p] = polyY[p] * np.exp(1j * 2 * np.pi * pCommutator * k / Ndown)
# Sum the branch outputs to get single downsampled and downconverted output value
y = np.sum(polyYC, axis=0)
if verbosity:
print('> downsample_bpf():')
print('. len(x) =', str(len(x)))
print('. Ndown =', str(Ndown))
print('. Nx =', str(Nx))
print('. Nxp =', str(Nxp))
print('. len(y) =', str(len(y))) # = Nxp
print('. k =', str(k))
print('')
return y
def resample(x, Nup, Ndown, coefs, verify=False, verbosity=1): # interpolate and decimate by Nup / Ndown
"""Resample x by factor U / D = Nup / Ndown
Loading