Skip to content
Snippets Groups Projects
Commit dd2d6f3b authored by Eric Kooistra's avatar Eric Kooistra
Browse files

Use shift_in_data90 in filter_block().

parent 6c06d705
Branches
No related tags found
1 merge request!419Resolve RTSD-265
...@@ -102,14 +102,54 @@ class PolyPhaseFirFilterStructure: ...@@ -102,14 +102,54 @@ class PolyPhaseFirFilterStructure:
# newest sample [15] # newest sample [15]
# samples [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15] # samples [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
# blocks [ 0, 1, 2, 3] # blocks [ 0, 1, 2, 3]
# shift blocks to next column in polyDelays #
# put newest block[3] in first column of polyDelays # Shift blocks to next column in polyDelays, put newest
# block[3] in first column of polyDelays:
#
# shift out, [0] is oldest input sample
# ^
# polyDelays = [[ 8, 4, 0], # polyDelays = [[ 8, 4, 0],
# [ 9, 5, 1], # [ 9, 5, 1],
# [10, 6, 2], # [10, 6, 2],
# [11, 7, 3]]) # [11, 7, 3]])
# ^
# shift in, [11] is newest input sample
#
# delayLine = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] <-- shift in
#
# Store as polyDelays structure, use mapping to access as delayLine:
# . map_to_delay_line()
# . map_to_poly_delays()
self.polyDelays = np.zeros((Nphases, self.Ntaps)) self.polyDelays = np.zeros((Nphases, self.Ntaps))
def map_to_delay_line(self):
delayLine = np.flip(self.polyDelays.T, axis=0).reshape(-1)
return delayLine
def map_to_poly_delays(self, delayLine):
self.polyDelays = np.flip(delayLine.reshape((self.Ntaps, self.Nphases)).T, axis=1)
def shift_in_data(self, inData):
"""Shift block of data into polyDelays structure.
View polyDelays as delay line if L < Nphases. If L = Nphases, then it
is possible to shift the data directly into the first column of
polyDelays.
"""
L = len(inData)
if L < self.Nphases:
delayLine = self.map_to_delay_line()
# Equivalent code:
# delayLine = np.concatenate((delayLine[L:], inData))
delayLine = np.roll(delayLine, -L)
delayLine[-L:] = inData
self.map_to_poly_delays(delayLine)
else:
# Equivalent code for L == Nphases: Shift in inData block directly
# at column 0
self.polyDelays = np.roll(self.polyDelays, 1, axis=1)
self.polyDelays[:, 0] = inData
def poly_coeffs(self, commutator): def poly_coeffs(self, commutator):
"""Polyphase structure for FIR filter coefficients coefs in Nphases """Polyphase structure for FIR filter coefficients coefs in Nphases
...@@ -156,21 +196,22 @@ class PolyPhaseFirFilterStructure: ...@@ -156,21 +196,22 @@ class PolyPhaseFirFilterStructure:
"""Filter block of inData per poly phase """Filter block of inData per poly phase
Input: Input:
. inData: block of Nphases input data, time index n increments, so . inData: block of input data with length <= Nphases, time index n increments,
inData[0] is oldest data and inData[Nphases-1] is newest data. so inData[0] is oldest data and inData[-1] is newest data. The inData
block size is >= 1 (is full rate) and <= Nphases (for maximally down
converted)
Return: Return:
. outData: block of Nphases filtered output data with outData[0] is . outData: block of Nphases filtered output data with outData[0] is
oldest data and outData[Nphases-1] is newest data, so time index n oldest data and outData[Nphases-1] is newest data, so time index n
increments, like with inData. increments, like with inData.
""" """
# Shift delay memory by one block # Shift in one block of input data (1 <= len(inData) <= Nphases)
self.polyDelays = np.roll(self.polyDelays, 1, axis=1) self.shift_in_data(inData)
# Shift in inData block at column 0
self.polyDelays[:, 0] = inData
# Apply FIR coefs per element # Apply FIR coefs per element
zData = self.polyDelays * self.polyCoefs zData = self.polyDelays * self.polyCoefs
# Sum FIR taps per poly phase # Sum FIR taps per poly phase
outData = np.sum(zData, axis=1) outData = np.sum(zData, axis=1)
# Output block of Nphases filtered output data
return outData return outData
...@@ -468,7 +509,10 @@ def maximal_downsample_bpf(x, Ndown, k, coefs, verbosity=1): ...@@ -468,7 +509,10 @@ def maximal_downsample_bpf(x, Ndown, k, coefs, verbosity=1):
Implement maximal downsampling down converter for one bin [HARRIS Fig 6.14]. Implement maximal downsampling down converter for one bin [HARRIS Fig 6.14].
The downsampling is maximal so Ndown = Ndft is number of frequency bins, is DFT size. The BPF is centered at w_k = 2pi k / Ndft, where Ndft is number of frequency bins, is DFT size. The downsampling
is maximal so Ndown = Ndft. The polyphase structure has Nphases = Ndown branches so the input x data that shifts
in remains in each branch. Therefore each branch can be FIR filtered independently for the whole input x using
polyX.
. see downsample() . see downsample()
...@@ -525,6 +569,73 @@ def maximal_downsample_bpf(x, Ndown, k, coefs, verbosity=1): ...@@ -525,6 +569,73 @@ def maximal_downsample_bpf(x, Ndown, k, coefs, verbosity=1):
return y return y
def nonmaximal_downsample_bpf(x, Ndown, k, Ndft, coefs, verbosity=1):
"""BPF x at bin k in range(Ndown) and downsample x by factor D = Ndown [HARRIS Fig 6.14]
Implement nonmaximal downsampling down converter for one bin, extend [HARRIS Fig 6.14].
The BPF is centered at w_k = 2pi k / Ndft, where Ndft is number of frequency bins, is DFT size. The polyphase
FIR structure has Nphases = Ndft branches, to support any Ndown <= Ndft. The input data shifts in per Ndown
samples, so it appears in different branches when Ndown < Ndft. Therefore the input data cannot be FIR filtered
per branch for the whole input x. Instead it needs to be FIR filtered per block of Ndown input samples from x,
using pfs.polyDelays in pfs.filter_block().
. see downsample()
Input:
. x: Input signal x[n]
. Ndown: downsample factor
. k: Index of BPF center frequency w_k = 2 pi k / Ndft
. Ndft: DFT size, number of polyphases in FIR structure
. coefs: prototype FIR filter coefficients for anti aliasing BPF
- verbosity: when > 0 print() status, else no print()
Return:
. y: Downsampled and down converted output signal y[m], m = n // D for bin
k. Complex baseband signal.
"""
a = [1.0] # FIR b = coefs, a = 1
# Polyphase implementation to avoid calculating values that are removed
# coefs = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
# polyCoefs = [[ 3, 7, 11],
# [ 2, 6, 10],
# [ 1, 5, 9],
# [ 0, 4, 8]])
pfs = PolyPhaseFirFilterStructure(Ndft, coefs, commutator='down')
polyCoefs = pfs.polyCoefs
# Poly phases for whole data signal x, prepended with Ndown - 1 zeros
polyX, Nx, Nxp = poly_structure_data_for_downsampling_whole_x(x, Ndown) # size (Ndown, Nxp), Ny = Nxp
# Filter x per polyphase, order in polyCoefs accounts for commutator [HARRIS Fig 6.12, 6.13]
polyY = np.zeros((Ndown, Nxp))
for p in range(Ndown):
polyY[p] = signal.lfilter(polyCoefs[p], a, polyX[p])
# Phase rotate per polyphase, due to delay line at branch inputs [HARRIS Eq 6.8]
# . polyY can use index p, because order in polyY accounts for commutator,
# . phase rotator needs to use pCommutator to account for commutator, to fit
# order in polyY and polyCoefs
polyYC = np.zeros((Ndown, Nxp), dtype='cfloat')
for p in range(Ndown):
pCommutator = Ndown - 1 - p
polyYC[p] = polyY[p] * np.exp(1j * 2 * np.pi * pCommutator * k / Ndown)
# Sum the branch outputs to get single downsampled and downconverted output value
y = np.sum(polyYC, axis=0)
if verbosity:
print('> downsample_bpf():')
print('. len(x) =', str(len(x)))
print('. Ndown =', str(Ndown))
print('. Nx =', str(Nx))
print('. Nxp =', str(Nxp))
print('. len(y) =', str(len(y))) # = Nxp
print('. k =', str(k))
print('')
return y
def resample(x, Nup, Ndown, coefs, verify=False, verbosity=1): # interpolate and decimate by Nup / Ndown def resample(x, Nup, Ndown, coefs, verify=False, verbosity=1): # interpolate and decimate by Nup / Ndown
"""Resample x by factor U / D = Nup / Ndown """Resample x by factor U / D = Nup / Ndown
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment