diff --git a/applications/lofar2/model/rtdsp/multirate.py b/applications/lofar2/model/rtdsp/multirate.py index d2481d2c6b7beee69246ecc72cbbed5f96a9290b..c3991477a4a4a85ed11847c57320a7676dc612c8 100644 --- a/applications/lofar2/model/rtdsp/multirate.py +++ b/applications/lofar2/model/rtdsp/multirate.py @@ -43,6 +43,13 @@ from .utilities import c_rtol, c_atol, ceil_div c_firA = [1.0] # FIR b = coefs, a = 1 +def zeros(shape, cmplx=False): + if cmplx: + return np.zeros(shape, dtype='cfloat') + else: + return np.zeros(shape) + + def down(x, D, phase=0): """Downsample x[n] by factor D, xD[m] = x[m D], m = n // D @@ -57,7 +64,7 @@ def up(x, U): After every sample in x insert U-1 zeros. """ - xU = np.zeros(len(x) * U) + xU = zeros(len(x) * U, np.iscomplexobj(x)) xU[0::U] = x return xU @@ -75,6 +82,9 @@ class PolyPhaseFirFilterStructure: using the FIR filter. The spectral aliasing and spectral replication are due to the sample rate change, not to the implementation structure. + The FIR data can be real to process real data, or complex to process + equivalent baseband data. The FIR coefficients are real. + The PFS is is suitable for downsampling and for upsampling: - Upsampling uses the PFS as Transposed Direct-Form FIR filter, where the commutator selects the polyphases from phase 0 to Nphases - 1 to yield @@ -89,9 +99,11 @@ class PolyPhaseFirFilterStructure: VAIDYANATHAN 4.3]. Input: - . coefs: FIR coefficients b[0 : Nphases * Ntaps - 1]. + . coefs: Real FIR coefficients b[0 : Nphases * Ntaps - 1]. . Nphases: number of polyphases, is number of rows (axis=0) in the polyphase structure. + . cmplx: Define PFS delays for real (when False) or complex (when True) + FIR data. Derived: . Ntaps: number of taps per polyphase, is number of columns (axis=1) in the polyphase structure. @@ -148,11 +160,12 @@ class PolyPhaseFirFilterStructure: . map_to_delay_line() . map_to_poly_delays() """ - def __init__(self, Nphases, coefs): + def __init__(self, Nphases, coefs, cmplx=False): self.coefs = coefs self.Ncoefs = len(coefs) self.Nphases = Nphases # branches, rows self.Ntaps = ceil_div(self.Ncoefs, Nphases) # taps, columns + self.cmplx = cmplx self.init_poly_coeffs() self.reset_poly_delays() @@ -177,12 +190,12 @@ class PolyPhaseFirFilterStructure: Nphases = 4 rows (axis=0) Ntaps = 3 columns (axis=1) """ - polyCoefs = np.zeros(self.Nphases * self.Ntaps) + polyCoefs = np.zeros(self.Nphases * self.Ntaps) # real polyCoefs[0 : self.Ncoefs] = self.coefs self.polyCoefs = polyCoefs.reshape((self.Ntaps, self.Nphases)).T def reset_poly_delays(self): - self.polyDelays = np.zeros((self.Nphases, self.Ntaps)) + self.polyDelays = zeros((self.Nphases, self.Ntaps), self.cmplx) def map_to_delay_line(self): delayLine = self.polyDelays.T.reshape(-1) @@ -280,7 +293,7 @@ def polyphase_data_for_downsampling_whole_x(x, Ndown, Nzeros): # . prepend x with Ndown - 1 zeros # . skip any last remaining samples from x, that are not enough yield a new # output FIR sum. - polyX = np.zeros(Ndown * Nxp) + polyX = zeros(Ndown * Nxp, np.iscomplexobj(x)) polyX[Ndown - 1] = x[0] polyX[Ndown:] = x[1 : Nx] # . Store data in time order per branch, so with oldest data left, to match @@ -333,7 +346,7 @@ def polyphase_frontend(x, Nphases, coefs, sampling): polyphase. """ # Use polyphase FIR filter coefficients from pfs class. - pfs = PolyPhaseFirFilterStructure(Nphases, coefs) + pfs = PolyPhaseFirFilterStructure(Nphases, coefs, np.iscomplexobj(x)) # Define polyphases for whole data signal x with Nx samples, instead of # using polyDelays per Ntaps from pfs class, to be able to use @@ -346,10 +359,11 @@ def polyphase_frontend(x, Nphases, coefs, sampling): # each use all x to yield Nup output samples per input sample from x. # The commutator index order for upsampling is p = 0, 1, .., Nup - 1, # so from top to bottom in the PFS. - polyY = np.zeros((Nup, Nx)) + polyY = zeros((Nup, Nx), np.iscomplexobj(x)) + pCommutator = range(Nup) for p in pCommutator: - polyY[p] = signal.lfilter(pfs.polyCoefs[p], c_firA, x) + polyY[p] = Nup * signal.lfilter(pfs.polyCoefs[p], c_firA, x) else: # 'down': # . Prepend x with Ndown - 1 zeros, to have y[m] for m = 0 start at @@ -366,7 +380,7 @@ def polyphase_frontend(x, Nphases, coefs, sampling): # commutator index order is only relevant when the branches are # calculated sequentially to reuse the same hardware, because for the # output y the branches are summed anyway. - polyY = np.zeros((Ndown, Nxp)) + polyY = zeros((Ndown, Nxp), np.iscomplexobj(x)) pCommutator = np.flip(np.arange(Ndown)) for p in pCommutator: polyY[p] = signal.lfilter(pfs.polyCoefs[p], c_firA, polyX[p]) @@ -418,10 +432,10 @@ def upsample(x, Nup, coefs, verify=False, verbosity=1): # interpolate # Inefficient upsampling implementation with multiply by zero values. # Verify that x --> U --> LPF --> y yields identical result y as with # using the PFS: x --> PFS FIR --> y. - xZeros = np.zeros((Nup, Nx)) + xZeros = zeros((Nup, Nx), np.iscomplexobj(x)) xZeros[0] = x xZeros = xZeros.T.reshape(1, Nup * Nx)[0] # upsample - yVerify = signal.lfilter(coefs, c_firA, xZeros) # LPF + yVerify = Nup * signal.lfilter(coefs, c_firA, xZeros) # LPF print('> Verify upsample():') if np.allclose(y, yVerify, rtol=c_rtol, atol=c_atol): print(' . PASSED: correct upsample result') @@ -481,7 +495,7 @@ def downsample(x, Ndown, coefs, verify=False, verbosity=1): # decimate # Inefficient downsampling implementation with calculating values that # are removed. Verify that x --> LPF --> D --> y yields identical # result y as with using the PFS: x --> PFS FIR --> y. - yVerify = np.zeros(Ndown * Nxp) + yVerify = zeros(Ndown * Nxp, np.iscomplexobj(x)) yVerify[0 : Nx] = signal.lfilter(coefs, c_firA, x[0 : Nx]) # LPF yVerify = yVerify.reshape(Nxp, Ndown).T # polyphases yVerify = yVerify[0] # downsample by D @@ -584,7 +598,7 @@ def resample(x, Nup, Ndown, coefs, verify=False, verbosity=1): # interpolate an v[0] = x v = v.T.reshape(1, Nup * Nx)[0] # upsample # . LPF - w = signal.lfilter(coefs, c_firA, v) + w = Nup * signal.lfilter(coefs, c_firA, v) # . Downsampling with calculating values that are removed yVerify = np.zeros(Ndown * Nyp) yVerify[0 : Ny] = w[0 : Ny] @@ -644,7 +658,8 @@ def time_shift_phasor_arr(k, Ndown, Ndft, Msamples): def maximal_downsample_bpf(x, Ndown, k, coefs, verbosity=1): - """BPF x at bin k in range(Ndown) and downsample x by factor D = Ndown. + """BPF x at bin k in range(Ndown) and downsample x by factor D = Ndown and + Ndown = Ndft. Implement maximal downsampling down converter for one bin (= critically sampled) [HARRIS Fig 6.14]. @@ -670,8 +685,8 @@ def maximal_downsample_bpf(x, Ndown, k, coefs, verbosity=1): # Phase rotate per polyphase for bin k, due to delay line at branch inputs # [HARRIS Eq 6.8] - polyYC = np.zeros((Ndown, Nxp), dtype='cfloat') kPhasors = unit_circle_loops_phasor_arr(k, Ndown, 'positive') + polyYC = np.zeros((Ndown, Nxp), dtype='cfloat') for p in range(Ndown): polyYC[p] = polyY[p] * kPhasors[p] # row = row * scalar @@ -691,6 +706,55 @@ def maximal_downsample_bpf(x, Ndown, k, coefs, verbosity=1): return yc +def maximal_upsample_bpf(x, Nup, k, coefs, verbosity=1): + """BPF x at bin k in range(Ndft) and downsample x by factor D = Nup + = Ndft. + + Implement maximal upsampling upconverter for one bin (= critically + sampled) [HARRIS Fig. 7.16]. + + The BPF is centered at w_k = 2pi k / Ndft, where Ndft is number of + frequency bins, is DFT size. The upsampling is maximal so Nup = Ndft. + The polyphase structure has Nphases = Nup branches, so the input x + data that shifts in remains in each branch. Therefore each branch can be + FIR filtered independently for the whole input x using polyX. + + Input: + . x: Input signal x[n] + . Nup: upsample factor + . k: Index of BPF center frequency w_k = 2 pi k / Nup + . coefs: prototype FIR filter coefficients for anti aliasing and + interpolating BPF + - verbosity: when > 0 print() status, else no print() + Return: + . yc: Upsampled and upconverted output signal yc[n], n = m D for + bin k. Complex signal, so positive frequencies only. + """ + # Polyphase FIR filter input x + polyY, Nx, Nxp = polyphase_frontend(x, Nup, coefs, 'up') + + # Phase rotate per polyphase for bin k, due to delay line at branch inputs + # [HARRIS Eq 7.8 = 6.8] + kPhasors = unit_circle_loops_phasor_arr(k, Nup, 'positive') + polyYC = np.zeros((Nup, Nxp), dtype='cfloat') + for p in range(Nup): + polyYC[p] = polyY[p] * kPhasors[p] # row = row * scalar + + # Output Nup samples yc[m] for every input sample x[n] + yc = polyYC.T.reshape(1, Nup * Nx)[0] + + if verbosity: + print('> Log maximal_downsample_bpf():') + print(' . len(x) =', str(len(x))) + print(' . Nx =', str(Nx)) + print(' . Nxp =', str(Nxp)) + print(' . len(yc) =', str(len(yc))) # = Nxp + print(' . Nup =', str(Nup)) + print(' . k =', str(k)) + print('') + return yc + + def non_maximal_downsample_bpf(x, Ndown, k, Ndft, coefs, verbosity=1): """BPF x at bin k in range(Ndown) and downsample x by factor D = Ndown