Newer
Older
#! /usr/bin/env python3
###############################################################################
#
# Copyright 2024
# ASTRON (Netherlands Institute for Radio Astronomy) <http://www.astron.nl/>
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###############################################################################
# Author: Eric Kooistra
# Purpose: Multirate functions for DSP
# Description:
#
# Usage:
# . Use [2] to verify this code.
#
# References:
# [1] dsp_study_erko.txt
# [2] http://localhost:8888/notebooks/pfb_os/up_downsampling.ipynb
#
# Books:
# . HARRIS 6 title Figure
# . LYONS
# . CROCHIERE
import numpy as np
from scipy import signal
from .utilities import c_rtol, c_atol, ceil_div
def down(x, D, phase=0):
"""Downsample x[n] by factor D, xD[m] = x[m D], m = n // D
Keep every D-th sample in x and skip every D-1 samples after that sample.
"""
xD = x[phase::D]
return xD
def up(x, U):
"""Upsample x by factor U, xU[m] = x[n] when m = n U, else 0.
After every sample in x insert U-1 zeros.
"""
xU = np.zeros(len(x) * U)
xU[0::U] = x
return xU
###############################################################################
# Polyphase filter structure for input block processing
###############################################################################
class PolyPhaseFirFilterStructure:
"""Polyphase FIR filter structure (PFS) per block of data
Purpose of PFS implementation is to avoid multiply by zero values in case
of upsampling and to avoid calculating samples that will be discarded in
case of downsampling. The output result of the PFS FIR is identical to
using the FIR filter. The spectral aliasing and spectral replication are
due to the sample rate change, not to the implementation structure.
The PFS is is suitable for downsampling and for upsampling:
- Upsampling uses the PFS as Transposed Direct-Form FIR filter, where the
commutator selects the polyphases from phase 0 to Nphases - 1 to yield
output samples for every input sample [HARRIS Fig 7.11, 7.12, CROCHIERE
Fig 3.25].
- Downsampling uses the PFS as a Direct-Form FIR filter, where the
commutator selects the polyphases from phase Nphase - 1 downto 0 to sum
samples from the past for one output sample [HARRIS Fig 6.12, 6.13,
CROCHIERE Fig 3.29].
- In both cases the commutator rotates counter clockwise, because the PFS
uses a delay line of z^(-1) called type 1 structure [CROCHIERE 3.3.3,
VAIDYANATHAN 4.3].
Input:
. coefs: FIR coefficients b[0 : Nphases * Ntaps - 1].
. Nphases: number of polyphases, is number of rows (axis=0) in the
polyphase structure.
Derived:
. Ntaps: number of taps per polyphase, is number of columns (axis=1) in
the polyphase structure.
. polyCoefs: FIR coefficients storage with Nphases rows and Ntaps columns.
. polyDelays: FIR taps storage with Nphases rows and Ntaps columns.
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
Storage of FIR coefficients and FIR data:
Stream of input samples x[n], n >= 0 for increasing time. Show index n
also as value so x[n] = n:
oldest sample newest sample
x v v
samples [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, ...]
blocks [ 0, 1, 2, 3]
^
newest block
The delayLine index corresponds to the FIR coefficient index k of
b[k] = impulse response h[k]. For Ncoefs = 12, with Nphases = 4 rows and
Ntaps = 3 columns:
polyCoefs = [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9,10,11] = b[k]
row = polyphase
polyCoefs = [[0, 4, 8], 0
[1, 5, 9], 1
[2, 6,10], 2
[3, 7,11]] 3
column: 0, 1, 2
Shift x[n] samples into delayLine from left, show sample index n in (n)
and delayLine index k in [k]. Shift in per sample or per block.
shift in -->(15,14,13,12,11,10, 9, 8, 7, 6, 5, 4) --> shift out
delayLine = [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9,10,11]
For polyDelays shift in from left-top (n = 15 is newest input sample)
and shift-out from right-bottom (n = 4 is oldest sample).
v v
polyDelays = [[0, 4, 8], ((15,11, 7),
[1, 5, 9], (14,10, 6),
[2, 6,10], (13, 9, 5),
[3, 7,11]] (12, 8, 4))
v v
Output sample y[n] = sum(polyCoefs * polyDelays).
For downsampling by Ndown = Nphases shift blocks of Ndown samples in
from left in polyDelays, put newest block[3] = x[12,13,14,15] in first
column [0] of polyDelays.
Store input samples in polyDelays structure, use mapping to access as
delayLine:
. map_to_delay_line()
. map_to_poly_delays()
self.coefs = coefs
self.Ncoefs = len(coefs)
self.Nphases = Nphases # branches, rows
self.Ntaps = ceil_div(self.Ncoefs, Nphases) # taps, columns
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
self.init_poly_coeffs()
self.reset_poly_delays()
def init_poly_coeffs(self):
"""Polyphase structure for FIR filter coefficients coefs in Nphases.
Input:
. coefs = prototype FIR filter coefficients (= b[k] = impulse response)
Return:
. If Ncoefs < Nphases * Ntaps, then pad coefs with zeros.
. E.g. coefs = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
coefs.reshape((4, 3)) = [[0, 1, 2],
[3, 4, 5],
[6, 7, 8],
[9,10,11])
. Therefore for Nphases = 4 rows and Ntaps = 3 columns use:
polyCoefs = coefs.reshape((3, 4)).T
= [[0, 4, 8], # p = 0, H0(z)
[1, 5, 9], # p = 1, H1(z)
[2, 6,10], # p = 2, H2(z)
[3, 7,11]) # p = 3, H3(z),
Nphases = 4 rows (axis=0)
Ntaps = 3 columns (axis=1)
"""
polyCoefs = np.zeros(self.Nphases * self.Ntaps)
polyCoefs[0 : self.Ncoefs] = self.coefs
self.polyCoefs = polyCoefs.reshape((self.Ntaps, self.Nphases)).T
def reset_poly_delays(self):
self.polyDelays = np.zeros((self.Nphases, self.Ntaps))
return delayLine
def map_to_poly_delays(self, delayLine):
self.polyDelays = delayLine.reshape((self.Ntaps, self.Nphases)).T
def shift_in_data(self, inData, flipped=False):
"""Shift block of data into the polyDelays structure.
View polyDelays as delay line if L < Nphases. Shift in from the left
in the delay line, so last (= newest) sample in inData will appear at
most left of the delay line. If L = Nphases, then it is possible to
shift the data directly into the first (= left) column of polyDelays.
Input:
. inData: Block of one or more input samples with index as time index
n in inData[n], so oldest sample at index 0 and newest sample at
index -1.
. flipped: False then inData order is inData[n] and still needs to be
flipped. If True then the inData is already flipped.
xData = inData if flipped else np.flip(inData)
if L < self.Nphases:
delayLine = self.map_to_delay_line()
# Equivalent code:
# delayLine = np.concatenate((inData, delayLine[L:]))
delayLine = np.roll(delayLine, L)
delayLine[:L] = xData
self.map_to_poly_delays(delayLine)
else:
# Equivalent code for L == Nphases: Shift in inData block directly
# at column 0
self.polyDelays = np.roll(self.polyDelays, 1, axis=1)
self.polyDelays[:, 0] = xData
def filter_block(self, inData, flipped=False):
Input:
. inData: block of input data with length <= Nphases, time index n in
inData[n] increments, so inData[0] is oldest data and inData[-1] is
newest data. The inData block size is = 1 for upsampling or > 1
and <= Nphases for downsampling.
Return:
. pfsData: block of polyphase FIR filtered output data for Nphase, with
pfsData[p] and p = 0:Nphases-1 from top to bottom.
. flipped: False then inData order is inData[n] and still needs to be
flipped. If True then the inData is already flipped.
# Shift in one block of input data (1 <= len(inData) <= Nphases)
self.shift_in_data(inData, flipped)
zData = self.polyDelays * self.polyCoefs
# Sum FIR taps per polyphase
pfsData = np.sum(zData, axis=1)
# Output block of Nphases filtered output data
###############################################################################
# Up, down, resample low pass input signal
###############################################################################
def polyphase_data_for_downsampling_whole_x(x, Ndown, Nzeros):
"""Polyphase data structure for downsampling whole signal x.
The polyphase structure has Ndown branches and branch size suitable to use
lfilter() once per polyphase branch for whole signal x. Instead of using
pfs.polyDelays per pfs.Ntaps.
. Prepend x with Nzeros = Ndown - 1 zeros to have first down sampled sample
at m = 0 start at n = 0.
. Skip any remaining last samples from x, that are not enough to yield a
new output FIR sum.
. Ndown: downsample rate and number of polyphase branches
. polyX: polyphase data structure with size (Ndown, Nxp) for Ndown branches
. Nx: Total number of samples from x, including prepended Ndown - 1 zeros.
. Nxp: Total number of samples used from x per polyphase branch, is the
number of samples Ny, that will be in downsampled output y[m] = x[m D],
for m = 0, 1, 2, ..., Nxp - 1.
Nxp = (Nzeros - 1 + Lx) // Ndown # Number of samples per polyphase
Nx = 1 + Ndown * (Nxp - 1) # Used number of samples from x
# . skip any last remaining samples from x, that are not enough yield a new
# output FIR sum.
# . Store data in time order per branch, so with oldest data left, to match
# use with lfilter(). Note this differs from order of tap data in
# pfs.polyDelays where newest data is left, because the block data is
# flipped by shift_in_data(), to match use in pfs.filter_block, so that
# it can use pfs.polyDelays * pfs.polyCoefs to filter the block.
# . The newest sample x[-1] has to be in top branch of polyX and the oldest
# sample x[0] in bottom branch, to match branch order of 0:Ndown-1 from
# top to bottom in the pfs.polyCoefs, therefore do flipud(polyX). This
# flipud() is similar as the flip() per block in shift_in_data().
#
# x[0] is oldest x[-1] is newest
# | |
# x[n] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9,10,11,12,13,14,15]
# (0, 1, 2, 3, 4, 5, 6, 7, 8, 9,10,11,12,13,14,15), x[n] = n
# newest
# poly poly v |
# Coefs = [[0, 4, 8], Delays = ((15,11, 7), polyX = ((3, 7,11,15),
# [1, 5, 9], (14,10, 6), (2, 6,10,14),
# [2, 6,10], (13, 9, 5), (1, 5, 9,13),
# [3, 7,11]] (12, 8, 4)) (0, 4, 8,12))
# v |
# oldest
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
def polyphase_frontend(x, Nphases, coefs, sampling):
"""Calculate PFS FIR filter output of x for Nphases.
The polyY[Nphases] output of this PFS frontend can be:
. summed for dowsampling by factor D = Nphases
. used as is for upsampling by factor U = Nphases
. used with a range of Nphases LOs for a single channel downsampler
downconverter, or upsampler upconverter
. used with a IDFT for a analysis filterbank (PFB), or synthesis PFB.
Input:
. x: Input signal x[n]
. Nphases: number of polyphase branches in PFS
. coefs: prototype FIR filter coefficients for anti aliasing LPF. The
len(coefs) typically is multiple of Nphases. If shorter, then the
coefs are extended with zeros.
. sampling: 'up' or 'down'
Return:
. polyY[Nphases]: Output of the FIR filtered branches in the PFS.
. Nx = np.size(polyY), total number of samples from x
. Nxp = np.size(polyY, axis=1), total number of samples from x per
polyphase.
"""
# Use polyphase FIR filter coefficients from pfs class.
pfs = PolyPhaseFirFilterStructure(Nphases, coefs)
# Define polyphases for whole data signal x with Nx samples, instead of
# using polyDelays per Ntaps from pfs class, to be able to use
# signal.lfilter() per polyphase branch for the whole data signal x.
if sampling == 'up':
Nx = len(x)
Nxp = Nx
Nup = Nphases
# Filter whole x per polyphase, so Nxp = Nx, because the Nup branches
# each use all x to yield Nup output samples per input sample from x.
# The commutator index order for upsampling is p = 0, 1, .., Nup - 1,
# so from top to bottom in the PFS.
polyY = np.zeros((Nup, Nx))
pCommutator = range(Nup)
for p in pCommutator:
polyY[p] = signal.lfilter(pfs.polyCoefs[p], c_firA, x)
else:
# 'down':
# . Prepend x with Ndown - 1 zeros, to have y[m] for m = 0 start at
# n = 0 of x[n].
# . Size of polyX is (Ndown, Nxp), and length of y is Ny = Nxp is
# length of each branch.
Ndown = Nphases
Nzeros = Ndown - 1
polyX, Nx, Nxp = polyphase_data_for_downsampling_whole_x(x, Ndown, Nzeros)
# print(polyX[:, 0])
# Filter Ndown parts of x per polyphase, because the FIR filter output
# y will sum. The commutator index order for downsampling is p =
# Ndown - 1,..., 1, 0, so from bottom to top in the PFS. However, the
# commutator index order is only relevant when the branches are
# calculated sequentially to reuse the same hardware, because for the
# output y the branches are summed anyway.
polyY = np.zeros((Ndown, Nxp))
pCommutator = np.flip(np.arange(Ndown))
for p in pCommutator:
polyY[p] = signal.lfilter(pfs.polyCoefs[p], c_firA, polyX[p])
return polyY, Nx, Nxp
def upsample(x, Nup, coefs, verify=False, verbosity=1): # interpolate
Input:
. x: Input signal x[n]
. Nup: upsample (interpolation) factor
. coefs: prototype FIR filter coefficients for anti aliasing LPF
. verify: when True then verify that output y is the same when calculated
directly or when calculated using the polyphase implementation.
Return:
. y: Upsampled output signal y[m]
Assumptions:
. x[n] = 0 for n < 0
. m = n * U, so first upsampled output sample starts at m = 0 based on
n = 0
. no y[m] for m < 0
. insert U - 1 zeros after each x[n] to get v[m], so len(v) = len(y) =
U * len(x)
. use coefs as anti aliasing filter, must be LPF with BW < fNyquist / U
. len(coefs) typically is multiple of Nup. If shorter, then the coefs are
extended with zeros.
Remarks:
. The input sample period is ts and the output sample period of the
upsampled (= interpolated signal) is tsUp = ts / U
. The group delay is (Ncoefs - 1) / 2 * tsUp = (Ncoefs - 1) / U / 2 * ts.
With odd Ncoefs and symmetrical coefs to have linear phase, the group
delay is an integer number of tsUp periods, so:
- when (Ncoefs - 1) % 2 == 0, then the group delay is an integer number
of tsUp periods
- when (Ncoefs - 1) % (2 * U) == 0, then the group delay is an integer
number of ts periods.
# Polyphase FIR filter input x
polyY, Nx, _ = polyphase_frontend(x, Nup, coefs, 'up')
# Output Nup samples y[m] for every input sample x[n]
y = polyY.T.reshape(1, Nup * Nx)[0]
if verify:
# Inefficient upsampling implementation with multiply by zero values.
# Verify that x --> U --> LPF --> y yields identical result y as with
# using the PFS: x --> PFS FIR --> y.
xZeros = np.zeros((Nup, Nx))
xZeros[0] = x
xZeros = xZeros.T.reshape(1, Nup * Nx)[0] # upsample
yVerify = signal.lfilter(coefs, c_firA, xZeros) # LPF
if np.allclose(y, yVerify, rtol=c_rtol, atol=c_atol):
else:
print(' . Nup =', str(Nup))
print(' . Nx =', str(Nx))
print(' . len(y) =', str(len(y)))
return y
def downsample(x, Ndown, coefs, verify=False, verbosity=1): # decimate
Input:
. x: Input signal x[n]
. Ndown: downsample (decimation) factor
. coefs: prototype FIR filter coefficients for anti aliasing LPF
. verify: when True then verify that output y is the same when calculated
directly or when calculated using the polyphase implementation.
Return:
. y: Downsampled output signal y[m]
Assumptions:
. x[n] = 0 for n < 0
. m = n // D, so first downsampled output sample starts at m = 0 based on
n = 0
. no y[m] for m < 0
. skip D - 1 values zeros after each x[n] to get y[m], so len(y) =
len(x) // D
. use coefs as anti aliasing filter, must be LPF with BW < fNyquist / D
. len(coefs) typically is multiple of Ndown. If shorter, then the coefs
are extended with zeros.
Remarks:
. The input sample period is ts and the output sample period of the
downsampled (= decimated signal) is tsDown = ts * D
. The group delay is (Ncoefs - 1) / 2 * ts. With odd Ncoefs and symmetrical
coefs to have linear phase, the group delay is an integer number of ts
periods, so:
- when (Ncoefs - 1) % 2 == 0, then the group delay is an integer
number of ts periods
- when (Ncoefs - 1) % (2 * D) == 0, then the group delay is an integer
number of tsDown periods
# Polyphase FIR filter input x
polyY, Nx, Nxp = polyphase_frontend(x, Ndown, coefs, 'down')
# FIR filter sum
y = np.sum(polyY, axis=0)
if verify:
# Inefficient downsampling implementation with calculating values that
# are removed. Verify that x --> LPF --> D --> y yields identical
# result y as with using the PFS: x --> PFS FIR --> y.
yVerify = np.zeros(Ndown * Nxp)
yVerify[0 : Nx] = signal.lfilter(coefs, c_firA, x[0 : Nx]) # LPF
yVerify = yVerify.reshape(Nxp, Ndown).T # polyphases
yVerify = yVerify[0] # downsample by D
print('> Verify downsample():')
if np.allclose(y, yVerify, rtol=c_rtol, atol=c_atol):
else:
print(' . len(x) =', str(len(x)))
print(' . Ndown =', str(Ndown))
print(' . Nx =', str(Nx))
print(' . Nxp =', str(Nxp))
print(' . len(y) =', str(len(y))) # = Nxp
def resample(x, Nup, Ndown, coefs, verify=False, verbosity=1): # interpolate and decimate by Nup / Ndown
"""Resample x by factor U / D = Nup / Ndown
x[n] --> Nup --> v[m] --> LPF --> w[m] --> Ndown --> y[k]
The w[m] is the result of upsampling x[n] by Nup. The LPF has narrowed the
pass band to suit both Nup and Ndown. Therefore the downsampling by Ndown
implies that only one in every Ndown samples of w[m] needs to be calculated
to get y[k] = w[k * D]. Hence resampling by Nup / Ndown takes less
processing than upsampling when Ndown > 1.
The upsampling structure outputs the interpolated samples via the separate
output phases. The same input sequence x is used to calculate all Nup
phases, whereby each phase uses a different set of coefficients from the
LPF. The downsampling structure sums the Ndown phases to output the
downsampled samples. Each phase uses a different set of coefficients from
the LPF to filter Ndown delay phases of the input sequence x.
Resampling is upsampling with downsampling by phase selection:
The resampling is done by first upsampling and then downsampling, because
then only one shared LPF is needed. For upsampling an LPF is always
needed, because it has to construct the inserted Nup - 1 zero values. For
downsampling the LPF of the upsampling already has restricted the input
band width (BW), provided that the LPF has BW < fNyquist / U and BW <
fNyquist / D. The downsampling therefore does not need an LPF and reduces
to selecting appropriate output phase of the upsampler. The upsampler then
only needs to calculate the output phase that will be used by the
downsampler, instead of calculating all output phases.
Input:
. x: Input signal x[n]
. Nup: upsample (interpolation) factor
. Ndown: downsample (decimation) factor
. coefs: FIR filter coefficients for anti aliasing LPF
. verify: when True then verify that output y is the same when calculated
directly or when calculated using the polyphase implementation.
Return:
. y: Resampled output signal y[m]
Assumptions:
. x[n] = 0 for n < 0
. k = m // D = (n * U) // D, so first resampled output sample starts at
k = 0 based on n = 0
. no y[k] for k < 0
. insert U - 1 zeros after each x[n] to get v[m], so len(v) = len(y) =
U * len(x)
. use coefs as anti aliasing filter, must be LPF with BW < fNyquist / U
and BW < fNyquist / D
. len(coefs) typically is multiple of Nup. If shorter, then the coefs are
extended with zeros.
Remarks:
. The group delay is the same as for the upsampler, because the
downsampling has no additional filtering.
"""
Nx = len(x)
Nv = Nup * Nx
# Nyp is number of samples from v, per polyphase in polyY
Nyp = (Nv + Ndown - 1) // Ndown
# Ny is used number of samples from v
Ny = 1 + Ndown * (Nyp - 1)
# Upsampling with LPF
w = upsample(x, Nup, coefs, verify=False)
# Downsampling selector
# This model is not efficient, because the upsample() has calculated all
# Nup phases, whereas it only needs to calculate the phase that is selected
# by the downsampling. However, it is considered not worth the effort to
# make the model more efficient, because then only parts of lfilter() in
# upsample() need to be calulated, so then lfilter() needs to be
# implemented manually and possibly in a for loop [CROCHIERE 3.3.4].
downSelect = np.arange(0, Nyp) * Ndown
y = w[downSelect]
if verify:
# Inefficient implementation
# . Upsampling with multiply by zero values
v[0] = x
v = v.T.reshape(1, Nup * Nx)[0] # upsample
# . LPF
# . Downsampling with calculating values that are removed
yVerify = np.zeros(Ndown * Nyp)
yVerify[0 : Ny] = w[0 : Ny]
yVerify = yVerify.reshape(Nyp, Ndown).T # polyphases
yVerify = yVerify[0] # downsample
if np.allclose(y, yVerify, rtol=c_rtol, atol=c_atol):
else:
print(' . len(x) =', str(len(x)))
print(' . Nx =', str(Nx))
print(' . len(v) =', str(len(v)))
print(' . Ny =', str(Ny))
print(' . Nyp =', str(Nyp))
print(' . len(y) =', str(len(y)))
return y
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
###############################################################################
# Single bandpass channel up and down sampling and up and down conversion
###############################################################################
def phasor_arr(k, Ndft, sign):
"""Return array of phasors: exp(+-j 2pi k / Ndft) for k in 0 : Ndft - 1
"""
if sign == 'positive':
return np.array([np.exp(2j * np.pi * p * k / Ndft) for p in range(Ndft)])
else: # 'negative'
return np.array([np.exp(-2j * np.pi * p * k / Ndft) for p in range(Ndft)])
def maximal_downsample_bpf(x, Ndown, k, coefs, verbosity=1):
"""BPF x at bin k in range(Ndown) and downsample x by factor D = Ndown.
Implement maximal downsampling down converter for one bin (= critically
sampled) [HARRIS Fig 6.14].
The BPF is centered at w_k = 2pi k / Ndft, where Ndft is number of
frequency bins, is DFT size. The downsampling is maximal so Ndown = Ndft.
The polyphase structure has Nphases = Ndown branches, so the input x
data that shifts in remains in each branch. Therefore each branch can be
FIR filtered independently for the whole input x using polyX.
Input:
. x: Input signal x[n]
. Ndown: downsample factor
. k: Index of BPF center frequency w_k = 2 pi k / Ndown
. coefs: prototype FIR filter coefficients for anti aliasing BPF
- verbosity: when > 0 print() status, else no print()
Return:
. yc: Downsampled and down converted output signal yc[m], m = n // D for
bin k. Complex baseband signal.
"""
# Polyphase FIR filter input x
polyY, Nx, Nxp = polyphase_frontend(x, Ndown, coefs, 'down')
# Phase rotate per polyphase for bin k, due to delay line at branch inputs
# [HARRIS Eq 6.8]
polyYC = np.zeros((Ndown, Nxp), dtype='cfloat')
phasors = phasor_arr(k, Ndown, 'positive')
for p in range(Ndown):
polyYC[p] = polyY[p] * phasors[p] # row = row * scalar
# Sum the branch outputs to get single downsampled and downconverted output
# complex baseband value yc.
yc = np.sum(polyYC, axis=0)
if verbosity:
print('> Log maximal_downsample_bpf():')
print(' . len(x) =', str(len(x)))
print(' . Nx =', str(Nx))
print(' . Nxp =', str(Nxp))
print(' . len(yc) =', str(len(yc))) # = Nxp
print(' . Ndown =', str(Ndown))
print(' . k =', str(k))
print('')
return yc
def non_maximal_downsample_bpf(x, Ndown, k, Ndft, coefs, verbosity=1):
"""BPF x at bin k in range(Ndown) and downsample x by factor D = Ndown
Implement nonmaximal downsampling down converter for one bin, extend
[HARRIS Fig 6.14].
The BPF is centered at w_k = 2pi k / Ndft, where Ndft is number of
frequency bins, is DFT size. The polyphase FIR structure has Nphases = Ndft
branches, to fit the requested number of bins. The polyphase FIR structure
is maximally downsampled (= critically sampled) for Ndown = Ndft, but it
can support any Ndown <= Ndft. The input data shifts in per Ndown samples,
so it appears in different branches when Ndown < Ndft and a new block is
shifted in. Therefore the input data cannot be FIR filtered per branch for
the whole input x. Instead it needs to be FIR filtered per block of Ndown
input samples from x, using pfs.polyDelays in pfs.filter_block().
Input:
. x: Input signal x[n]
. Ndown: downsample factor
. k: Index of BPF center frequency w_k = 2 pi k / Ndft
. Ndft: DFT size, number of polyphases in PFS FIR filter
. coefs: prototype LPF FIR filter coefficients for anti aliasing BPF
- verbosity: when > 0 print() status, else no print()
Return:
. yc: Downsampled and down converted output signal yc[m], m = n // D for
bin k. Complex baseband signal.
"""
# Prepend x with Ndown - 1 zeros, and represent x in Nblocks of Ndown
# samples
Nzeros = Ndown - 1
xBlocks, Nx, Nblocks = polyphase_data_for_downsampling_whole_x(x, Ndown, Nzeros)
# Prepare output
yc = np.zeros(Nblocks, dtype='cfloat')
# PFS with Ndft polyphases
pfs = PolyPhaseFirFilterStructure(Ndft, coefs)
phasors = phasor_arr(k, Ndft, 'positive')
for b in range(Nblocks):
# Filter block
inData = xBlocks[:, b]
pfsData = pfs.filter_block(inData, flipped=True)
# Phase rotate polyphases for bin k [HARRIS Eq 6.8]
pfsBinData = pfsData * phasors
# Sum the polyphases to get single downsampled and downconverted output value
yc[b] = np.sum(pfsBinData)
if verbosity:
print('> non_maximal_downsample_bpf():')
print(' . len(x) =', str(len(x)))
print(' . Nx =', str(Nx))
print(' . Nblocks =', str(Nblocks))
print(' . len(yc) =', str(len(yc))) # = Nblocks
print(' . Ndown =', str(Ndown))
print(' . Ndft =', str(Ndft))
print(' . k =', str(k))
print('')
return yc