Skip to content
Snippets Groups Projects
Commit 5a311a1b authored by Eric Kooistra's avatar Eric Kooistra
Browse files

Add filter_up() method.

parent 9e11c085
Branches
No related tags found
1 merge request!419Resolve RTSD-265
...@@ -95,7 +95,7 @@ class PolyPhaseFirFilterStructure: ...@@ -95,7 +95,7 @@ class PolyPhaseFirFilterStructure:
output samples for every input sample [HARRIS Fig 7.11, 7.12, CROCHIERE output samples for every input sample [HARRIS Fig 7.11, 7.12, CROCHIERE
Fig 3.25]. Fig 3.25].
- Downsampling uses the PFS as a Direct-Form FIR filter, where the - Downsampling uses the PFS as a Direct-Form FIR filter, where the
commutator selects the polyphases from phase Nphase - 1 downto 0 to sum commutator selects the polyphases from phase Nphases - 1 downto 0 to sum
samples from the past for one output sample [HARRIS Fig 6.12, 6.13, samples from the past for one output sample [HARRIS Fig 6.12, 6.13,
CROCHIERE Fig 3.29]. CROCHIERE Fig 3.29].
- In both cases the commutator rotates counter clockwise, because the PFS - In both cases the commutator rotates counter clockwise, because the PFS
...@@ -208,7 +208,7 @@ class PolyPhaseFirFilterStructure: ...@@ -208,7 +208,7 @@ class PolyPhaseFirFilterStructure:
def map_to_poly_delays(self, delayLine): def map_to_poly_delays(self, delayLine):
self.polyDelays = delayLine.reshape((self.Ntaps, self.Nphases)).T self.polyDelays = delayLine.reshape((self.Ntaps, self.Nphases)).T
def shift_in_data(self, inData, flipped=True): def shift_in_data(self, inData, flipped):
"""Shift block of data into the polyDelays structure. """Shift block of data into the polyDelays structure.
View polyDelays as delay line if L < Nphases. Shift in from the left View polyDelays as delay line if L < Nphases. Shift in from the left
...@@ -238,33 +238,23 @@ class PolyPhaseFirFilterStructure: ...@@ -238,33 +238,23 @@ class PolyPhaseFirFilterStructure:
self.polyDelays = np.roll(self.polyDelays, 1, axis=1) self.polyDelays = np.roll(self.polyDelays, 1, axis=1)
self.polyDelays[:, 0] = xData self.polyDelays[:, 0] = xData
def filter_block(self, inData, sampling, flipped=True): def filter_block(self, inData, flipped):
"""Filter block of input data per polyphase for downsampling or for """Filter block of inData per polyphase.
upsampling.
Input: Input:
. inData: block of input data for downsampling or single input data for . inData: block of input data with length <= Nphases, time index n in
upsampling. inData[n] increments, so inData[0] is oldest data and inData[-1] is
. sampling: 'down' or 'up' newest data. The inData block size is = 1 for upsampling or > 1
- 'down': for downsampling inData[n] is a block of length L time and <= Nphases for downsampling.
samples, with 1 <= L <= Nphases [HARRIS Fig. 6.12, 6.13]. The time . flipped: False then inData order is inData[n] with newest sample last
index n in inData[n] increments, so inData[0] is oldest data and and still needs to be flipped. If True then the inData is already
inData[-1] is newest data when flipped is False. flipped in time, so with newest sample first.
- 'up': for upsampling inData length L = 1 sample [HARRIS Fig. 7.11,
7.12]. The inData is fanout to the Nphases.
. flipped: False then inData order is inData[n] for downsampling and
still needs to be flipped. If True then the inData is already
flipped. Not used for upsampling.
Return: Return:
. pfsData: block of polyphase FIR filtered output data for Nphases, with . pfsData: block of polyphase FIR filtered output data for Nphases, with
pfsData[p] and p = 0:Nphases-1 from top to bottom. pfsData[p] and p = 0:Nphases-1 from top to bottom.
""" """
# Shift in one block of input data (1 <= len(inData) <= Nphases) # Shift in one block of input data (1 <= len(inData) <= Nphases)
if sampling == 'down':
self.shift_in_data(inData, flipped) self.shift_in_data(inData, flipped)
else: # 'up'
inWires = inData * ones(self.Nphases, np.iscomplexobj(inData))
self.shift_in_data(inWires, flipped=True)
# Apply FIR coefs per delay element # Apply FIR coefs per delay element
zData = self.polyDelays * self.polyCoefs zData = self.polyDelays * self.polyCoefs
# Sum FIR taps per polyphase # Sum FIR taps per polyphase
...@@ -272,6 +262,22 @@ class PolyPhaseFirFilterStructure: ...@@ -272,6 +262,22 @@ class PolyPhaseFirFilterStructure:
# Output block of Nphases filtered output data # Output block of Nphases filtered output data
return pfsData return pfsData
def filter_up(self, inSample):
"""Filter same input sample at each polyphase, to upsample it by factor
U = Nphases.
Input:
. inSample: input sample that is applied to each polyphase
Return:
. pfsData: block of polyphase FIR filtered output data for Nphases,
with pfsData[p] and p = 0:Nphases-1 from top to bottom. The
pfsData[0] is the first and thus oldest and pfsData[-1] is the last
and thus newest interpolated data.
"""
inWires = inSample * ones(self.Nphases, np.iscomplexobj(inSample))
pfsData = self.filter_block(inWires, flipped=True)
return pfsData
############################################################################### ###############################################################################
# Up, down, resample low pass input signal # Up, down, resample low pass input signal
...@@ -641,7 +647,7 @@ def resample(x, Nup, Ndown, coefs, verify=False, verbosity=1): # interpolate an ...@@ -641,7 +647,7 @@ def resample(x, Nup, Ndown, coefs, verify=False, verbosity=1): # interpolate an
# Single bandpass channel up and downsampling and up and downconversion # Single bandpass channel up and downsampling and up and downconversion
############################################################################### ###############################################################################
def unit_circle_loops_phasor_arr(k, N, sign=1): def unit_circle_loops_phasor_arr(k, N, sign):
"""Return array of N phasors on k loops along the unit circle. """Return array of N phasors on k loops along the unit circle.
Polyphase dependent phase offsets for bin k, when N = Ndown = Ndft [HARRIS Polyphase dependent phase offsets for bin k, when N = Ndown = Ndft [HARRIS
...@@ -709,7 +715,7 @@ def maximal_downsample_bpf(x, Ndown, k, coefs, verbosity=1): ...@@ -709,7 +715,7 @@ def maximal_downsample_bpf(x, Ndown, k, coefs, verbosity=1):
# Phase rotate per polyphase for bin k, due to delay line at branch inputs # Phase rotate per polyphase for bin k, due to delay line at branch inputs
# [HARRIS Eq 6.8] # [HARRIS Eq 6.8]
kPhasors = unit_circle_loops_phasor_arr(k, Ndown) kPhasors = unit_circle_loops_phasor_arr(k, Ndown, 1)
polyYc = np.zeros((Ndown, Nxp), dtype='cfloat') polyYc = np.zeros((Ndown, Nxp), dtype='cfloat')
for p in range(Ndown): for p in range(Ndown):
polyYc[p] = polyY[p] * kPhasors[p] # row = row * scalar polyYc[p] = polyY[p] * kPhasors[p] # row = row * scalar
...@@ -760,8 +766,9 @@ def maximal_upsample_bpf(xBase, Nup, k, coefs, verbosity=1): ...@@ -760,8 +766,9 @@ def maximal_upsample_bpf(xBase, Nup, k, coefs, verbosity=1):
polyY, Nx, Nxp = polyphase_frontend(xBase, Nup, coefs, 'up') polyY, Nx, Nxp = polyphase_frontend(xBase, Nup, coefs, 'up')
# Phase rotate per polyphase for bin k, due to delay line at branch inputs # Phase rotate per polyphase for bin k, due to delay line at branch inputs
# [HARRIS Eq 7.8 = 6.8] # [HARRIS Eq 7.8 = 6.8, Fig 7.16], can be applied after the FIR filter,
kPhasors = unit_circle_loops_phasor_arr(k, Nup) # because the kPhasors per polyphase are constants.
kPhasors = unit_circle_loops_phasor_arr(k, Nup, 1)
polyYc = np.zeros((Nup, Nxp), dtype='cfloat') polyYc = np.zeros((Nup, Nxp), dtype='cfloat')
for p in range(Nup): for p in range(Nup):
polyYc[p] = polyY[p] * kPhasors[p] # row = row * scalar polyYc[p] = polyY[p] * kPhasors[p] # row = row * scalar
...@@ -770,7 +777,7 @@ def maximal_upsample_bpf(xBase, Nup, k, coefs, verbosity=1): ...@@ -770,7 +777,7 @@ def maximal_upsample_bpf(xBase, Nup, k, coefs, verbosity=1):
yc = polyYc.T.reshape(1, Nup * Nx)[0] yc = polyYc.T.reshape(1, Nup * Nx)[0]
if verbosity: if verbosity:
print('> Log maximal_downsample_bpf():') print('> Log maximal_upsample_bpf():')
print(' . len(xBase) =', str(len(xBase))) print(' . len(xBase) =', str(len(xBase)))
print(' . Nx =', str(Nx)) print(' . Nx =', str(Nx))
print(' . Nxp =', str(Nxp)) print(' . Nxp =', str(Nxp))
...@@ -822,7 +829,7 @@ def non_maximal_downsample_bpf(x, Ndown, k, Ndft, coefs, verbosity=1): ...@@ -822,7 +829,7 @@ def non_maximal_downsample_bpf(x, Ndown, k, Ndft, coefs, verbosity=1):
# PFS with Ndft polyphases # PFS with Ndft polyphases
pfs = PolyPhaseFirFilterStructure(Ndft, coefs) pfs = PolyPhaseFirFilterStructure(Ndft, coefs)
kPhasors = unit_circle_loops_phasor_arr(k, Ndft) # [HARRIS Eq 6.8] kPhasors = unit_circle_loops_phasor_arr(k, Ndft, 1) # [HARRIS Eq 6.8]
# Oversampling time shift compensation via frequency dependent phase shift # Oversampling time shift compensation via frequency dependent phase shift
tPhasors = time_shift_phasor_arr(k, Ndown, Ndft, Nblocks, -1) # [HARRIS Eq 9.3] tPhasors = time_shift_phasor_arr(k, Ndown, Ndft, Nblocks, -1) # [HARRIS Eq 9.3]
...@@ -830,12 +837,12 @@ def non_maximal_downsample_bpf(x, Ndown, k, Ndft, coefs, verbosity=1): ...@@ -830,12 +837,12 @@ def non_maximal_downsample_bpf(x, Ndown, k, Ndft, coefs, verbosity=1):
for b in range(Nblocks): for b in range(Nblocks):
# Filter block # Filter block
inData = xBlocks[:, b] inData = xBlocks[:, b]
pfsData = pfs.filter_block(inData, 'down', flipped=True) pfsData = pfs.filter_block(inData, flipped=True)
# Phase rotate polyphases for bin k # Phase rotate polyphases for bin k
pfsBinData = pfsData * kPhasors * tPhasors[b] # [HARRIS Eq 6.8, 9.3] pfsBinData = pfsData * kPhasors # [HARRIS Eq 6.8, 9.3]
# Sum the polyphases to get single downsampled and downconverted output # Sum the polyphases to get single downsampled and downconverted output
# value # value
yc[b] = np.sum(pfsBinData) yc[b] = np.sum(pfsBinData) * tPhasors[b]
if verbosity: if verbosity:
print('> non_maximal_downsample_bpf():') print('> non_maximal_downsample_bpf():')
...@@ -867,7 +874,7 @@ def non_maximal_upsample_bpf(xBase, Nup, k, Ndft, coefs, verbosity=1): ...@@ -867,7 +874,7 @@ def non_maximal_upsample_bpf(xBase, Nup, k, Ndft, coefs, verbosity=1):
so it appears from different branches when Nup < Ndft and a new block is so it appears from different branches when Nup < Ndft and a new block is
shifted out. Therefore the output data cannot be FIR filtered per branch shifted out. Therefore the output data cannot be FIR filtered per branch
for the whole input xBase. Instead it needs to be FIR filtered per block of for the whole input xBase. Instead it needs to be FIR filtered per block of
Nup output samples, using pfs.polyDelays in pfs.filter_block(). Nup output samples, using pfs.polyDelays in pfs.filter_up().
Input: Input:
. xBase: Input equivalent baseband signal xBase[m] . xBase: Input equivalent baseband signal xBase[m]
...@@ -888,28 +895,21 @@ def non_maximal_upsample_bpf(xBase, Nup, k, Ndft, coefs, verbosity=1): ...@@ -888,28 +895,21 @@ def non_maximal_upsample_bpf(xBase, Nup, k, Ndft, coefs, verbosity=1):
polyYc = np.zeros((Nup, Nblocks), dtype='cfloat') polyYc = np.zeros((Nup, Nblocks), dtype='cfloat')
# PFS with Ndft polyphases # PFS with Ndft polyphases
pfs = PolyPhaseFirFilterStructure(Ndft, coefs, cmplx=True) pfsUp = PolyPhaseFirFilterStructure(Nup, coefs, cmplx=True)
kPhasors = unit_circle_loops_phasor_arr(k, Ndft) # [HARRIS Eq 7.8] kPhasors = unit_circle_loops_phasor_arr(k, Ndft, 1) # [HARRIS Eq 7.8]
# Oversampling time shift compensation via frequency dependent phase shift # Oversampling time shift compensation via frequency dependent phase shift
tPhasors = time_shift_phasor_arr(k, Nup, Ndft, Nblocks, -1) # [HARRIS Eq 9.3] tPhasors = time_shift_phasor_arr(k, Nup, Ndft, Nblocks, -1) # [HARRIS Eq 9.3]
# Polyphase FIR filter input xBase # Polyphase FIR filter input xBase
nLo = 0
for b in range(Nblocks): for b in range(Nblocks):
# Filter block # Filter block
inData = xBase[b] inSample = xBase[b]
pfsData = Nup * pfs.filter_block(inData, 'up') pfsData = Nup * pfsUp.filter_up(inSample)
# Phase rotate polyphases for bin k # Phase rotate polyphases for bin k
pfsBinData = pfsData * kPhasors * tPhasors[b] # [HARRIS Eq 7.8, 9.3] pfsBinData = pfsData * kPhasors[0:Nup]
# Output Nup samples yc[n] for every input sample xBase[m] # Output Nup samples yc[n] for every input sample xBase[m]
nEnd = nLo + Nup outBinData = pfsBinData * tPhasors[b]
if nEnd <= Ndft:
outBinData = pfsBinData[nLo : nEnd]
else:
outBinData = np.concatenate((pfsBinData[nLo : Ndft],
pfsBinData[0 : nEnd - Ndft]))
nLo = nEnd % Ndft
polyYc[:, b] = outBinData polyYc[:, b] = outBinData
yc = polyYc.T.reshape(1, Nup * Nblocks)[0] yc = polyYc.T.reshape(1, Nup * Nblocks)[0]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment