Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
H
HDL
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Iterations
Wiki
Requirements
Jira
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Locked files
Build
Pipelines
Jobs
Pipeline schedules
Test cases
Artifacts
Deploy
Releases
Container registry
Model registry
Operate
Environments
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Code review analytics
Issue analytics
Insights
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
RTSD
HDL
Commits
7927866d
Commit
7927866d
authored
9 months ago
by
Eric Kooistra
Browse files
Options
Downloads
Patches
Plain Diff
Improve descriptions.
parent
f82baf8c
No related branches found
No related tags found
1 merge request
!419
Resolve RTSD-265
Pipeline
#91850
passed
9 months ago
Stage: linting
Changes
2
Pipelines
1
Show whitespace changes
Inline
Side-by-side
Showing
2 changed files
applications/lofar2/model/pfb_os/multirate_mixer.ipynb
+170
-50
170 additions, 50 deletions
applications/lofar2/model/pfb_os/multirate_mixer.ipynb
applications/lofar2/model/rtdsp/multirate.py
+326
-307
326 additions, 307 deletions
applications/lofar2/model/rtdsp/multirate.py
with
496 additions
and
357 deletions
applications/lofar2/model/pfb_os/multirate_mixer.ipynb
+
170
−
50
View file @
7927866d
Source diff could not be displayed: it is too large. Options to address this:
view the blob
.
This diff is collapsed.
Click to expand it.
applications/lofar2/model/rtdsp/multirate.py
+
326
−
307
View file @
7927866d
...
...
@@ -39,6 +39,8 @@ import numpy as np
from
scipy
import
signal
from
.utilities
import
c_rtol
,
c_atol
,
ceil_div
c_firA
=
[
1.0
]
# FIR b = coefs, a = 1
def
down
(
x
,
D
,
phase
=
0
):
"""
Downsample x[n] by factor D, xD[m] = x[m D], m = n // D
...
...
@@ -60,16 +62,17 @@ def up(x, U):
###############################################################################
# Polyphase filter
# Polyphase filter
structure for input block processing
###############################################################################
class
PolyPhaseFirFilterStructure
:
"""
Polyphase FIR filter structure (PFS) per block of data
Input:
. coefs: FIR coefficients b[0 : Nphases * Ntaps - 1].
. Nphases: number of polyphases, is number of rows (axis=0) in the
polyphase structure.
Purpose of PFS implementation is to avoid multiply by zero values in case
of upsampling and to avoid calculating samples that will be discarded in
case of downsampling. The output result of the PFS FIR is identical to
using the FIR filter. The spectral aliasing and spectral replication are
due to the sample rate change, not to the implementation structure.
The PFS is is suitable for downsampling and for upsampling:
- Upsampling uses the PFS as Transposed Direct-Form FIR filter, where the
...
...
@@ -84,66 +87,71 @@ class PolyPhaseFirFilterStructure:
uses a delay line of z^(-1) called type 1 structure [CROCHIERE 3.3.3,
VAIDYANATHAN 4.3].
Input:
. coefs: FIR coefficients b[0 : Nphases * Ntaps - 1].
. Nphases: number of polyphases, is number of rows (axis=0) in the
polyphase structure.
Derived:
. Ntaps: number of taps per polyphase, is number of columns (axis=1) in
the polyphase structure.
. polyCoefs: FIR coefficients storage with Nphases rows and Ntaps columns.
. polyDelays: FIR taps storage with Nphases rows and Ntaps columns.
Storage of FIR coefficients and FIR data:
Stream of input samples x[n], n >= 0 for increasing time. Show index n
also as value so x[n] = n:
oldest sample newest sample
x v v
samples [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, ...]
blocks [ 0, 1, 2, 3]
^
newest block
The delayLine index corresponds to the FIR coefficient index k of
b[k] = impulse response h[k]. For Ncoefs = 12, with Nphases = 4 rows and
Ntaps = 3 columns:
polyCoefs = [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9,10,11] = b[k]
row = polyphase
polyCoefs = [[0, 4, 8], 0
[1, 5, 9], 1
[2, 6,10], 2
[3, 7,11]] 3
column: 0, 1, 2
Shift x[n] samples into delayLine from left, show sample index n in (n)
and delayLine index k in [k]. Shift in per sample or per block.
shift in -->(15,14,13,12,11,10, 9, 8, 7, 6, 5, 4) --> shift out
delayLine = [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9,10,11]
For polyDelays shift in from left-top (n = 15 is newest input sample)
and shift-out from right-bottom (n = 4 is oldest sample).
v v
polyDelays = [[0, 4, 8], ((15,11, 7),
[1, 5, 9], (14,10, 6),
[2, 6,10], (13, 9, 5),
[3, 7,11]] (12, 8, 4))
v v
Output sample y[n] = sum(polyCoefs * polyDelays).
For downsampling by Ndown = Nphases shift blocks of Ndown samples in
from left in polyDelays, put newest block[3] = x[12,13,14,15] in first
column [0] of polyDelays.
Store input samples in polyDelays structure, use mapping to access as
delayLine:
. map_to_delay_line()
. map_to_poly_delays()
"""
def
__init__
(
self
,
Nphases
,
coefs
):
self
.
coefs
=
coefs
self
.
Ncoefs
=
len
(
coefs
)
self
.
Nphases
=
Nphases
# branches, rows
self
.
Ntaps
=
ceil_div
(
self
.
Ncoefs
,
Nphases
)
# taps, columns
# Stream of input samples x[n], n >= 0 for increasing time. Show index
# n also as value so x[n] = n:
#
# oldest sample newest sample
# x v v
# samples [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, ...]
# blocks [ 0, 1, 2, 3]
# ^
# newest block
#
# The delayLine index corresponds to the FIR coefficient index k of
# b[k] = impulse response h[k]. For Ncoefs = 12, with Nphases = 4 rows
# and Ntaps = 3 columns:
#
# polyCoefs = [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9,10,11] = b[k]
# row = polyphase
# polyCoefs = [[0, 4, 8], 0
# [1, 5, 9], 1
# [2, 6,10], 2
# [3, 7,11]] 3
# column: 0, 1, 2
#
# Shift x[n] samples into delayLine from left, show sample index n in
# (n) and delayLine index k in [k]. Shift in per sample or per block.
#
# shift in -->(15,14,13,12,11,10, 9, 8, 7, 6, 5, 4) --> shift out
# delayLine = [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9,10,11]
#
# For polyDelays shift in from left-top (n = 15 is newest input sample)
# and shift-out from right-bottom (n = 4 is oldest sample).
#
# v v
# polyDelays = [[0, 4, 8], ((15,11, 7),
# [1, 5, 9], (14,10, 6),
# [2, 6,10], (13, 9, 5),
# [3, 7,11]] (12, 8, 4))
# v v
#
# Output sample y[n] = sum(polyCoefs * polyDelays).
#
# For downsampling by Ndown = Nphases shift blocks of Ndown samples in
# from left in polyDelays, put newest block[3] = x[12,13,14,15] in
# first column [0] of polyDelays.
#
# Store input samples in polyDelays structure, use mapping to access as
# delayLine:
# . map_to_delay_line()
# . map_to_poly_delays()
self
.
init_poly_coeffs
()
self
.
reset_poly_delays
()
...
...
@@ -231,52 +239,134 @@ class PolyPhaseFirFilterStructure:
return
pfsData
def
poly_data_for_downsampling_whole_x
(
x
,
Ndown
):
###############################################################################
# Up, down, resample low pass input signal
###############################################################################
def
polyphase_data_for_downsampling_whole_x
(
x
,
Ndown
,
Nzeros
):
"""
Polyphase data structure for downsampling whole signal x.
The polyphase structure has Ndown branches and branch size suitable to use
lfilter() once per polyphase branch for whole signal x. Instead of using
pfs.polyDelays per pfs.Ntaps.
. Prepend x with Ndown - 1 zeros to have first down sampled sample
at m = 0
start at n = 0.
. Skip any remaining last samples from x, that are not enough yield a
new
output FIR sum.
. Prepend x with
Nzeros =
Ndown - 1 zeros to have first down sampled sample
at m = 0
start at n = 0.
. Skip any remaining last samples from x, that are not enough
to
yield a
new
output FIR sum.
Input:
. x: input samples x[n] for n = 0 : Lx - 1
. x:
all
input samples x[n] for n = 0 : Lx - 1
. Ndown: downsample rate and number of polyphase branches
. Nzeros: number of zero samples to prepend for x
Return:
. polyX: polyphase data structure with size (Ndown, Nxp) for Ndown branches
and Nxp samples from x per branch.
. Nx: Total number of samples from x, including prepended Ndown - 1 zeros.
. Nxp: Total number of samples used from x per polyphase branch, is
Ny
the
number of samples that will be in downsampled output y[m] = x[m D]
. Nxp: Total number of samples used from x per polyphase branch, is the
number of samples
Ny,
that will be in downsampled output y[m] = x[m D]
,
for m = 0, 1, 2, ..., Nxp - 1.
"""
Lx
=
len
(
x
)
# Numer of samples per polyphase
Nxp
=
(
Ndown
-
1
+
Lx
)
//
Ndown
# Used number of samples from x
Nx
=
1
+
Ndown
*
(
Nxp
-
1
)
Nxp
=
(
Nzeros
-
1
+
Lx
)
//
Ndown
# Number of samples per polyphase
Nx
=
1
+
Ndown
*
(
Nxp
-
1
)
# Used number of samples from x
# Load x into polyX with Ndown rows = polyphases
# . prepend x with Ndown - 1 zeros
# . skip any last remaining samples from x, that are not enough yield a new
# output FIR sum.
# . store data in time order per branch, so with oldest data left, to match
# use with lfilter(). Note this differs from order of tap data in
# pfs.polyDelays where newest data is left, to match use in
# pfs.filter_block that uses pfs.polyDelays * pfs.polyCoefs to filter the
# block.
polyX
=
np
.
zeros
(
Ndown
*
Nxp
)
polyX
[
Ndown
-
1
]
=
x
[
0
]
polyX
[
Ndown
:]
=
x
[
1
:
Nx
]
# . Newest sample in top branch, to match branch order in the
# pfs.polyCoefs, therefore do flipud(polyX)
# . Store data in time order per branch, so with oldest data left, to match
# use with lfilter(). Note this differs from order of tap data in
# pfs.polyDelays where newest data is left, because the block data is
# flipped by shift_in_data(), to match use in pfs.filter_block, so that
# it can use pfs.polyDelays * pfs.polyCoefs to filter the block.
# . The newest sample x[-1] has to be in top branch of polyX and the oldest
# sample x[0] in bottom branch, to match branch order of 0:Ndown-1 from
# top to bottom in the pfs.polyCoefs, therefore do flipud(polyX). This
# flipud() is similar as the flip() per block in shift_in_data().
#
# x[0] is oldest x[-1] is newest
# | |
# x[n] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9,10,11,12,13,14,15]
# (0, 1, 2, 3, 4, 5, 6, 7, 8, 9,10,11,12,13,14,15), x[n] = n
# newest
# poly poly v |
# Coefs = [[0, 4, 8], Delays = ((15,11, 7), polyX = ((3, 7,11,15),
# [1, 5, 9], (14,10, 6), (2, 6,10,14),
# [2, 6,10], (13, 9, 5), (1, 5, 9,13),
# [3, 7,11]] (12, 8, 4)) (0, 4, 8,12))
# v |
# oldest
polyX
=
np
.
flipud
(
polyX
.
reshape
(
Nxp
,
Ndown
).
T
)
return
polyX
,
Nx
,
Nxp
def
polyphase_frontend
(
x
,
Nphases
,
coefs
,
sampling
):
"""
Calculate PFS FIR filter output of x for Nphases.
The polyY[Nphases] output of this PFS frontend can be:
. summed for dowsampling by factor D = Nphases
. used as is for upsampling by factor U = Nphases
. used with a range of Nphases LOs for a single channel downsampler
downconverter, or upsampler upconverter
. used with a IDFT for a analysis filterbank (PFB), or synthesis PFB.
Input:
. x: Input signal x[n]
. Nphases: number of polyphase branches in PFS
. coefs: prototype FIR filter coefficients for anti aliasing LPF. The
len(coefs) typically is multiple of Nphases. If shorter, then the
coefs are extended with zeros.
. sampling:
'
up
'
or
'
down
'
Return:
. polyY[Nphases]: Output of the FIR filtered branches in the PFS.
. Nx = np.size(polyY), total number of samples from x
. Nxp = np.size(polyY, axis=1), total number of samples from x per
polyphase.
"""
# Use polyphase FIR filter coefficients from pfs class.
pfs
=
PolyPhaseFirFilterStructure
(
Nphases
,
coefs
)
# Define polyphases for whole data signal x with Nx samples, instead of
# using polyDelays per Ntaps from pfs class, to be able to use
# signal.lfilter() per polyphase branch for the whole data signal x.
if
sampling
==
'
up
'
:
Nx
=
len
(
x
)
Nxp
=
Nx
Nup
=
Nphases
# Filter whole x per polyphase, so Nxp = Nx, because the Nup branches
# each use all x to yield Nup output samples per input sample from x.
# The commutator index order for upsampling is p = 0, 1, .., Nup - 1,
# so from top to bottom in the PFS.
polyY
=
np
.
zeros
((
Nup
,
Nx
))
pCommutator
=
range
(
Nup
)
for
p
in
pCommutator
:
polyY
[
p
]
=
signal
.
lfilter
(
pfs
.
polyCoefs
[
p
],
c_firA
,
x
)
else
:
# 'down':
# . Prepend x with Ndown - 1 zeros, to have y[m] for m = 0 start at
# n = 0 of x[n].
# . Size of polyX is (Ndown, Nxp), and length of y is Ny = Nxp is
# length of each branch.
Ndown
=
Nphases
Nzeros
=
Ndown
-
1
polyX
,
Nx
,
Nxp
=
polyphase_data_for_downsampling_whole_x
(
x
,
Ndown
,
Nzeros
)
print
(
polyX
[:,
0
])
# Filter Ndown parts of x per polyphase, because the FIR filter output
# y will sum. The commutator index order for downsampling is p =
# Ndown - 1,..., 1, 0, so from bottom to top in the PFS. However, the
# commutator index order is only relevant when the branches are
# calculated sequentially to reuse the same hardware, because for the
# output y the branches are summed anyway.
polyY
=
np
.
zeros
((
Ndown
,
Nxp
))
pCommutator
=
np
.
flip
(
np
.
arange
(
Ndown
))
for
p
in
pCommutator
:
polyY
[
p
]
=
signal
.
lfilter
(
pfs
.
polyCoefs
[
p
],
c_firA
,
polyX
[
p
])
return
polyY
,
Nx
,
Nxp
def
upsample
(
x
,
Nup
,
coefs
,
verify
=
False
,
verbosity
=
1
):
# interpolate
"""
Upsample x by factor U = Nup and LPF.
...
...
@@ -312,47 +402,26 @@ def upsample(x, Nup, coefs, verify=False, verbosity=1): # interpolate
- when (Ncoefs - 1) % (2 * U) == 0, then the group delay is an integer
number of ts periods.
"""
Nx
=
len
(
x
)
a
=
[
1.0
]
# FIR b = coefs, a = 1
# Polyphase implementation to avoid multiply by zero values
# coefs = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
# polyCoefs = [[ 0, 4, 8], # p = 0
# [ 1, 5, 9], # p = 1
# [ 2, 6, 10], # p = 2
# [ 3, 7, 11]]) # p = 3
pfs
=
PolyPhaseFirFilterStructure
(
Nup
,
coefs
)
polyCoefs
=
pfs
.
polyCoefs
# Poly phases for data
# . Use polyX for whole data signal x with Nx samples, instead of
# pfs.polyDelays per pfs.Ntaps, to be able to use signal.lfilter() per
# branch for whole data signal x.
polyY
=
np
.
zeros
((
Nup
,
Nx
))
# Filter x per polyphase, because the Nup branches each use all x to yield
# Nup output samples per input sample from x. The commutator index order
# for upsampling is p = 0, 1, .., Nup - 1, so from top to bottom in pfs.
pCommutator
=
range
(
Nup
)
for
p
in
pCommutator
:
polyY
[
p
]
=
signal
.
lfilter
(
polyCoefs
[
p
],
a
,
x
)
# Polyphase FIR filter input x
polyY
,
Nx
,
_
=
polyphase_frontend
(
x
,
Nup
,
coefs
,
'
up
'
)
# Output Nup samples for every input sample
# Output Nup samples
y[m]
for every input sample
x[n]
y
=
polyY
.
T
.
reshape
(
1
,
Nup
*
Nx
)[
0
]
if
verify
:
# Inefficient upsampling implementation with multiply by zero values
# Inefficient upsampling implementation with multiply by zero values.
# Verify that x --> U --> LPF --> y yields identical result y as with
# using the PFS: x --> PFS FIR --> y.
xZeros
=
np
.
zeros
((
Nup
,
Nx
))
xZeros
[
0
]
=
x
xZeros
=
xZeros
.
T
.
reshape
(
1
,
Nup
*
Nx
)[
0
]
# upsample
yVerify
=
signal
.
lfilter
(
coefs
,
a
,
xZeros
)
# LPF
yVerify
=
signal
.
lfilter
(
coefs
,
c_firA
,
xZeros
)
# LPF
print
(
'
> Verify upsample():
'
)
if
np
.
allclose
(
y
,
yVerify
,
rtol
=
c_rtol
,
atol
=
c_atol
):
print
(
'
. PASSED: correct upsample result
'
)
else
:
print
(
'
. ERROR: wrong upsample result
'
)
return
False
if
verbosity
:
print
(
'
> Log upsample():
'
)
print
(
'
. Nup =
'
,
str
(
Nup
))
...
...
@@ -397,35 +466,18 @@ def downsample(x, Ndown, coefs, verify=False, verbosity=1): # decimate
- when (Ncoefs - 1) % (2 * D) == 0, then the group delay is an integer
number of tsDown periods
"""
a
=
[
1.0
]
# FIR b = coefs, a = 1
# Polyphase implementation to avoid calculating values that are removed
pfs
=
PolyPhaseFirFilterStructure
(
Ndown
,
coefs
)
polyCoefs
=
pfs
.
polyCoefs
# Poly phases for whole data signal x, prepended with Ndown - 1 zeros.
# Size of polyX is (Ndown, Nxp), and length of y is Ny = Nxp is length
# of each branch.
polyX
,
Nx
,
Nxp
=
poly_data_for_downsampling_whole_x
(
x
,
Ndown
)
# Filter x per polyphase, commutator index order for downsampling is
# p = Ndown - 1, ..., 1, 0, so from bottom to top in pfs. However, the
# commutator index order is only relevant when the branches are
# calculated sequentially to reuse the same hardware, because for the
# output y the branches are summed anyway.
polyY
=
np
.
zeros
((
Ndown
,
Nxp
))
pCommutator
=
np
.
flip
(
np
.
arange
(
Ndown
))
for
p
in
pCommutator
:
polyY
[
p
]
=
signal
.
lfilter
(
polyCoefs
[
p
],
a
,
polyX
[
p
])
# Polyphase FIR filter input x
polyY
,
Nx
,
Nxp
=
polyphase_frontend
(
x
,
Ndown
,
coefs
,
'
down
'
)
#
Sum the branch outputs to get single downsampled output value
#
FIR filter sum
y
=
np
.
sum
(
polyY
,
axis
=
0
)
if
verify
:
# Inefficient downsampling implementation with calculating values that
# are removed, so x --> LPF --> D --> y:
# are removed. Verify that x --> LPF --> D --> y yields identical
# result y as with using the PFS: x --> PFS FIR --> y.
yVerify
=
np
.
zeros
(
Ndown
*
Nxp
)
yVerify
[
0
:
Nx
]
=
signal
.
lfilter
(
coefs
,
a
,
x
[
0
:
Nx
])
# LPF
yVerify
[
0
:
Nx
]
=
signal
.
lfilter
(
coefs
,
c_firA
,
x
[
0
:
Nx
])
# LPF
yVerify
=
yVerify
.
reshape
(
Nxp
,
Ndown
).
T
# polyphases
yVerify
=
yVerify
[
0
]
# downsample by D
print
(
'
> Verify downsample():
'
)
...
...
@@ -434,7 +486,6 @@ def downsample(x, Ndown, coefs, verify=False, verbosity=1): # decimate
else
:
print
(
'
. ERROR: wrong downsample result
'
)
return
False
if
verbosity
:
print
(
'
> Log downsample():
'
)
print
(
'
. len(x) =
'
,
str
(
len
(
x
)))
...
...
@@ -446,159 +497,6 @@ def downsample(x, Ndown, coefs, verify=False, verbosity=1): # decimate
return
y
def
maximal_downsample_bpf
(
x
,
Ndown
,
k
,
coefs
,
verbosity
=
1
):
"""
BPF x at bin k in range(Ndown) and downsample x by factor D = Ndown.
Implement maximal downsampling down converter for one bin (= critically
sampled) [HARRIS Fig 6.14].
The BPF is centered at w_k = 2pi k / Ndft, where Ndft is number of
frequency bins, is DFT size. The downsampling is maximal so Ndown = Ndft.
The polyphase structure has Nphases = Ndown branches, so the input x
data that shifts in remains in each branch. Therefore each branch can be
FIR filtered independently for the whole input x using polyX.
. see downsample()
Input:
. x: Input signal x[n]
. Ndown: downsample factor
. k: Index of BPF center frequency w_k = 2 pi k / Ndown
. coefs: prototype FIR filter coefficients for anti aliasing BPF
- verbosity: when > 0 print() status, else no print()
Return:
. y: Downsampled and down converted output signal y[m], m = n // D for bin
k. Complex baseband signal.
"""
a
=
[
1.0
]
# FIR b = coefs, a = 1
# Polyphase implementation to avoid calculating values that are removed
# coefs = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
# polyCoefs = [[ 3, 7, 11],
# [ 2, 6, 10],
# [ 1, 5, 9],
# [ 0, 4, 8]])
pfs
=
PolyPhaseFirFilterStructure
(
Ndown
,
coefs
,)
polyCoefs
=
pfs
.
polyCoefs
# Poly phases for whole data signal x, prepended with Ndown - 1 zeros
polyX
,
Nx
,
Nxp
=
poly_data_for_downsampling_whole_x
(
x
,
Ndown
)
# Filter x per polyphase [HARRIS Fig 6.12, 6.13]
polyY
=
np
.
zeros
((
Ndown
,
Nxp
))
for
p
in
range
(
Ndown
):
polyY
[
p
]
=
signal
.
lfilter
(
polyCoefs
[
p
],
a
,
polyX
[
p
])
# Phase rotate per polyphase for bin k, due to delay line at branch inputs
# [HARRIS Eq 6.8]
polyYC
=
np
.
zeros
((
Ndown
,
Nxp
),
dtype
=
'
cfloat
'
)
for
p
in
range
(
Ndown
):
polyYC
[
p
]
=
polyY
[
p
]
*
np
.
exp
(
1j
*
2
*
np
.
pi
*
p
*
k
/
Ndown
)
# Sum the branch outputs to get single downsampled and downconverted output
# value
y
=
np
.
sum
(
polyYC
,
axis
=
0
)
if
verbosity
:
print
(
'
> Log downsample_bpf():
'
)
print
(
'
. len(x) =
'
,
str
(
len
(
x
)))
print
(
'
. Ndown =
'
,
str
(
Ndown
))
print
(
'
. Nx =
'
,
str
(
Nx
))
print
(
'
. Nxp =
'
,
str
(
Nxp
))
print
(
'
. len(y) =
'
,
str
(
len
(
y
)))
# = Nxp
print
(
'
. k =
'
,
str
(
k
))
print
(
''
)
return
y
# def non_maximal_downsample_bpf(x, Ndown, k, Ndft, coefs, verbosity=1):
# """BPF x at bin k in range(Ndown) and downsample x by factor D = Ndown [HARRIS Fig 6.14]
#
# Implement nonmaximal downsampling down converter for one bin, extend [HARRIS Fig 6.14].
#
# The BPF is centered at w_k = 2pi k / Ndft, where Ndft is number of frequency bins, is DFT size. The polyphase
# FIR structure has Nphases = Ndft branches, to fit the requested number of bins. The polyphase FIR structure
# is maximally downsampled (= critically sampled) for Ndown = Ndft, but it can support any Ndown <= Ndft. The
# input data shifts in per Ndown samples, so it appears in different branches when Ndown < Ndft and a new block
# is shifted in. Therefore the input data cannot be FIR filtered per branch for the whole input x. Instead it
# needs to be FIR filtered per block of Ndown input samples from x, using pfs.polyDelays in pfs.filter_block().
#
# . see downsample()
#
# Input:
# . x: Input signal x[n]
# . Ndown: downsample factor
# . k: Index of BPF center frequency w_k = 2 pi k / Ndft
# . Ndft: DFT size, number of polyphases in FIR structure
# . coefs: prototype FIR filter coefficients for anti aliasing BPF
# - verbosity: when > 0 print() status, else no print()
# Return:
# . y: Downsampled and down converted output signal y[m], m = n // D for bin
# k. Complex baseband signal.
# """
# a = [1.0] # FIR b = coefs, a = 1
#
# # Polyphase implementation to avoid calculating values that are removed
# # coefs = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
# # polyCoefs = [[ 3, 7, 11],
# # [ 2, 6, 10],
# # [ 1, 5, 9],
# # [ 0, 4, 8]])
# pfs = PolyPhaseFirFilterStructure(Ndft, coefs)
#
# # Prepended x with Ndown - 1 zeros
# Lzeros = Ndown - 1
# Ldown = (Lzeros + len(x)) // Ndown # number of downsampled samples
# Lin = Ldown * Ndown - Lzeros # number of input samples from x
#
# # Prepare storage per block
# pfsOutput = np.zeros((Ldown, Ndft))
# binOutput = np.zeros((down)
#
# xData = np.zeros(Ndown)
# xData[Ndown - 1] = x[0]
# for b in range(Ldown):
# # Filtered signal
# pfsOutput[b] = pfs.filter_block(xData)
# # Phase rotate per polyphase for bin k, due to delay line at branch inputs [HARRIS Eq 6.8]
# for p in range(Ndft):
# pCommutator = Ndft - 1 - p
# pfsOutput[p] = pfsOutput[p] * np.exp(1j * 2 * np.pi * pCommutator * k / Ndft)
# # Sum the branch outputs to get single downsampled and downconverted output value
# y = np.sum(pfsOutput, axis=0)
#
#
#
#
# # Filter x per polyphase, order in polyCoefs accounts for commutator [HARRIS Fig 6.12, 6.13]
# polyY = np.zeros((Ndown, Nxp))
# for p in range(Ndown):
# polyY[p] = signal.lfilter(polyCoefs[p], a, polyX[p])
#
# # Phase rotate per polyphase, due to delay line at branch inputs [HARRIS Eq 6.8]
# # . polyY can use index p, because order in polyY accounts for commutator,
# # . phase rotator needs to use pCommutator to account for commutator, to fit
# # order in polyY and polyCoefs
# polyYC = np.zeros((Ndown, Nxp), dtype='cfloat')
# for p in range(Ndown):
# pCommutator = Ndown - 1 - p
# polyYC[p] = polyY[p] * np.exp(1j * 2 * np.pi * pCommutator * k / Ndown)
#
# # Sum the branch outputs to get single downsampled and downconverted output value
# y = np.sum(polyYC, axis=0)
#
# if verbosity:
# print('> downsample_bpf():')
# print('. len(x) =', str(len(x)))
# print('. Ndown =', str(Ndown))
# print('. Nx =', str(Nx))
# print('. Nxp =', str(Nxp))
# print('. len(y) =', str(len(y))) # = Nxp
# print('. k =', str(k))
# print('')
# return y
def
resample
(
x
,
Nup
,
Ndown
,
coefs
,
verify
=
False
,
verbosity
=
1
):
# interpolate and decimate by Nup / Ndown
"""
Resample x by factor U / D = Nup / Ndown
...
...
@@ -618,9 +516,9 @@ def resample(x, Nup, Ndown, coefs, verify=False, verbosity=1): # interpolate an
downsampled samples. Each phase uses a different set of coefficients from
the LPF to filter Ndown delay phases of the input sequence x.
Resampling is upsampling with downsampling by phase selection
Resampling is upsampling with downsampling by phase selection
:
The resampling is done by first upsampling and then downsampling, because
then only one share
e
d LPF is needed. For upsampling an LPF is always
then only one shared LPF is needed. For upsampling an LPF is always
needed, because it has to construct the inserted Nup - 1 zero values. For
downsampling the LPF of the upsampling already has restricted the input
band width (BW), provided that the LPF has BW < fNyquist / U and BW <
...
...
@@ -682,8 +580,7 @@ def resample(x, Nup, Ndown, coefs, verify=False, verbosity=1): # interpolate an
v
[
0
]
=
x
v
=
v
.
T
.
reshape
(
1
,
Nup
*
Nx
)[
0
]
# upsample
# . LPF
a
=
[
1.0
]
# FIR b = coefs, a = 1
w
=
signal
.
lfilter
(
coefs
,
a
,
v
)
w
=
signal
.
lfilter
(
coefs
,
c_firA
,
v
)
# . Downsampling with calculating values that are removed
yVerify
=
np
.
zeros
(
Ndown
*
Nyp
)
yVerify
[
0
:
Ny
]
=
w
[
0
:
Ny
]
...
...
@@ -696,7 +593,6 @@ def resample(x, Nup, Ndown, coefs, verify=False, verbosity=1): # interpolate an
else
:
print
(
'
. ERROR: wrong resample result
'
)
return
False
if
verbosity
:
print
(
'
> Log resample():
'
)
print
(
'
. len(x) =
'
,
str
(
len
(
x
)))
...
...
@@ -707,3 +603,126 @@ def resample(x, Nup, Ndown, coefs, verify=False, verbosity=1): # interpolate an
print
(
'
. len(y) =
'
,
str
(
len
(
y
)))
print
(
''
)
return
y
###############################################################################
# Single bandpass channel up and down sampling and up and down conversion
###############################################################################
def
phasor_arr
(
k
,
Ndft
,
sign
):
"""
Return array of phasors: exp(+-j 2pi k / Ndft) for k in 0 : Ndft - 1
"""
if
sign
==
'
positive
'
:
return
np
.
array
([
np
.
exp
(
2j
*
np
.
pi
*
p
*
k
/
Ndft
)
for
p
in
range
(
Ndft
)])
else
:
# 'negative'
return
np
.
array
([
np
.
exp
(
-
2j
*
np
.
pi
*
p
*
k
/
Ndft
)
for
p
in
range
(
Ndft
)])
def
maximal_downsample_bpf
(
x
,
Ndown
,
k
,
coefs
,
verbosity
=
1
):
"""
BPF x at bin k in range(Ndown) and downsample x by factor D = Ndown.
Implement maximal downsampling down converter for one bin (= critically
sampled) [HARRIS Fig 6.14].
The BPF is centered at w_k = 2pi k / Ndft, where Ndft is number of
frequency bins, is DFT size. The downsampling is maximal so Ndown = Ndft.
The polyphase structure has Nphases = Ndown branches, so the input x
data that shifts in remains in each branch. Therefore each branch can be
FIR filtered independently for the whole input x using polyX.
Input:
. x: Input signal x[n]
. Ndown: downsample factor
. k: Index of BPF center frequency w_k = 2 pi k / Ndown
. coefs: prototype FIR filter coefficients for anti aliasing BPF
- verbosity: when > 0 print() status, else no print()
Return:
. yc: Downsampled and down converted output signal yc[m], m = n // D for
bin k. Complex baseband signal.
"""
# Polyphase FIR filter input x
polyY
,
Nx
,
Nxp
=
polyphase_frontend
(
x
,
Ndown
,
coefs
,
'
down
'
)
# Phase rotate per polyphase for bin k, due to delay line at branch inputs
# [HARRIS Eq 6.8]
polyYC
=
np
.
zeros
((
Ndown
,
Nxp
),
dtype
=
'
cfloat
'
)
phasors
=
phasor_arr
(
k
,
Ndown
,
'
positive
'
)
for
p
in
range
(
Ndown
):
polyYC
[
p
]
=
polyY
[
p
]
*
phasors
[
p
]
# row = row * scalar
# Sum the branch outputs to get single downsampled and downconverted output
# complex baseband value yc.
yc
=
np
.
sum
(
polyYC
,
axis
=
0
)
if
verbosity
:
print
(
'
> Log maximal_downsample_bpf():
'
)
print
(
'
. len(x) =
'
,
str
(
len
(
x
)))
print
(
'
. Nx =
'
,
str
(
Nx
))
print
(
'
. Nxp =
'
,
str
(
Nxp
))
print
(
'
. len(yc) =
'
,
str
(
len
(
yc
)))
# = Nxp
print
(
'
. Ndown =
'
,
str
(
Ndown
))
print
(
'
. k =
'
,
str
(
k
))
print
(
''
)
return
yc
def
non_maximal_downsample_bpf
(
x
,
Ndown
,
k
,
Ndft
,
coefs
,
verbosity
=
1
):
"""
BPF x at bin k in range(Ndown) and downsample x by factor D = Ndown
Implement nonmaximal downsampling down converter for one bin, extend
[HARRIS Fig 6.14].
The BPF is centered at w_k = 2pi k / Ndft, where Ndft is number of
frequency bins, is DFT size. The polyphase FIR structure has Nphases = Ndft
branches, to fit the requested number of bins. The polyphase FIR structure
is maximally downsampled (= critically sampled) for Ndown = Ndft, but it
can support any Ndown <= Ndft. The input data shifts in per Ndown samples,
so it appears in different branches when Ndown < Ndft and a new block is
shifted in. Therefore the input data cannot be FIR filtered per branch for
the whole input x. Instead it needs to be FIR filtered per block of Ndown
input samples from x, using pfs.polyDelays in pfs.filter_block().
Input:
. x: Input signal x[n]
. Ndown: downsample factor
. k: Index of BPF center frequency w_k = 2 pi k / Ndft
. Ndft: DFT size, number of polyphases in PFS FIR filter
. coefs: prototype LPF FIR filter coefficients for anti aliasing BPF
- verbosity: when > 0 print() status, else no print()
Return:
. yc: Downsampled and down converted output signal yc[m], m = n // D for
bin k. Complex baseband signal.
"""
# Prepend x with Ndown - 1 zeros, and represent x in Nblocks of Ndown
# samples
Nzeros
=
Ndown
-
1
xBlocks
,
Nx
,
Nblocks
=
polyphase_data_for_downsampling_whole_x
(
x
,
Ndown
,
Nzeros
)
print
(
xBlocks
[:,
0
])
# Prepare output
yc
=
np
.
zeros
(
Nblocks
,
dtype
=
'
cfloat
'
)
# PFS with Ndft polyphases
pfs
=
PolyPhaseFirFilterStructure
(
Ndft
,
coefs
)
phasors
=
phasor_arr
(
k
,
Ndft
,
'
positive
'
)
for
b
in
range
(
Nblocks
):
# Filter block
inData
=
xBlocks
[:,
b
]
pfsData
=
pfs
.
filter_block
(
inData
)
# Phase rotate polyphases for bin k [HARRIS Eq 6.8]
pfsBinData
=
pfsData
*
phasors
# Sum the polyphases to get single downsampled and downconverted output value
yc
[
b
]
=
np
.
sum
(
pfsBinData
)
if
verbosity
:
print
(
'
> non_maximal_downsample_bpf():
'
)
print
(
'
. len(x) =
'
,
str
(
len
(
x
)))
print
(
'
. Nx =
'
,
str
(
Nx
))
print
(
'
. Nblocks =
'
,
str
(
Nblocks
))
print
(
'
. len(yc) =
'
,
str
(
len
(
yc
)))
# = Nblocks
print
(
'
. Ndown =
'
,
str
(
Ndown
))
print
(
'
. Ndft =
'
,
str
(
Ndft
))
print
(
'
. k =
'
,
str
(
k
))
print
(
''
)
return
yc
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment