diff --git a/applications/lofar2/model/rtdsp/multirate.py b/applications/lofar2/model/rtdsp/multirate.py index 378d53a54277974b8c764b7a80073706251f1c1b..f7106bf78de04527711d63de1e96ffde740359cc 100644 --- a/applications/lofar2/model/rtdsp/multirate.py +++ b/applications/lofar2/model/rtdsp/multirate.py @@ -102,14 +102,54 @@ class PolyPhaseFirFilterStructure: # newest sample [15] # samples [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15] # blocks [ 0, 1, 2, 3] - # shift blocks to next column in polyDelays - # put newest block[3] in first column of polyDelays + # + # Shift blocks to next column in polyDelays, put newest + # block[3] in first column of polyDelays: + # + # shift out, [0] is oldest input sample + # ^ # polyDelays = [[ 8, 4, 0], # [ 9, 5, 1], # [10, 6, 2], # [11, 7, 3]]) + # ^ + # shift in, [11] is newest input sample + # + # delayLine = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] <-- shift in + # + # Store as polyDelays structure, use mapping to access as delayLine: + # . map_to_delay_line() + # . map_to_poly_delays() self.polyDelays = np.zeros((Nphases, self.Ntaps)) + def map_to_delay_line(self): + delayLine = np.flip(self.polyDelays.T, axis=0).reshape(-1) + return delayLine + + def map_to_poly_delays(self, delayLine): + self.polyDelays = np.flip(delayLine.reshape((self.Ntaps, self.Nphases)).T, axis=1) + + def shift_in_data(self, inData): + """Shift block of data into polyDelays structure. + + View polyDelays as delay line if L < Nphases. If L = Nphases, then it + is possible to shift the data directly into the first column of + polyDelays. + """ + L = len(inData) + if L < self.Nphases: + delayLine = self.map_to_delay_line() + # Equivalent code: + # delayLine = np.concatenate((delayLine[L:], inData)) + delayLine = np.roll(delayLine, -L) + delayLine[-L:] = inData + self.map_to_poly_delays(delayLine) + else: + # Equivalent code for L == Nphases: Shift in inData block directly + # at column 0 + self.polyDelays = np.roll(self.polyDelays, 1, axis=1) + self.polyDelays[:, 0] = inData + def poly_coeffs(self, commutator): """Polyphase structure for FIR filter coefficients coefs in Nphases @@ -156,21 +196,22 @@ class PolyPhaseFirFilterStructure: """Filter block of inData per poly phase Input: - . inData: block of Nphases input data, time index n increments, so - inData[0] is oldest data and inData[Nphases-1] is newest data. + . inData: block of input data with length <= Nphases, time index n increments, + so inData[0] is oldest data and inData[-1] is newest data. The inData + block size is >= 1 (is full rate) and <= Nphases (for maximally down + converted) Return: . outData: block of Nphases filtered output data with outData[0] is oldest data and outData[Nphases-1] is newest data, so time index n increments, like with inData. """ - # Shift delay memory by one block - self.polyDelays = np.roll(self.polyDelays, 1, axis=1) - # Shift in inData block at column 0 - self.polyDelays[:, 0] = inData + # Shift in one block of input data (1 <= len(inData) <= Nphases) + self.shift_in_data(inData) # Apply FIR coefs per element zData = self.polyDelays * self.polyCoefs # Sum FIR taps per poly phase outData = np.sum(zData, axis=1) + # Output block of Nphases filtered output data return outData @@ -468,7 +509,10 @@ def maximal_downsample_bpf(x, Ndown, k, coefs, verbosity=1): Implement maximal downsampling down converter for one bin [HARRIS Fig 6.14]. - The downsampling is maximal so Ndown = Ndft is number of frequency bins, is DFT size. + The BPF is centered at w_k = 2pi k / Ndft, where Ndft is number of frequency bins, is DFT size. The downsampling + is maximal so Ndown = Ndft. The polyphase structure has Nphases = Ndown branches so the input x data that shifts + in remains in each branch. Therefore each branch can be FIR filtered independently for the whole input x using + polyX. . see downsample() @@ -525,6 +569,73 @@ def maximal_downsample_bpf(x, Ndown, k, coefs, verbosity=1): return y +def nonmaximal_downsample_bpf(x, Ndown, k, Ndft, coefs, verbosity=1): + """BPF x at bin k in range(Ndown) and downsample x by factor D = Ndown [HARRIS Fig 6.14] + + Implement nonmaximal downsampling down converter for one bin, extend [HARRIS Fig 6.14]. + + The BPF is centered at w_k = 2pi k / Ndft, where Ndft is number of frequency bins, is DFT size. The polyphase + FIR structure has Nphases = Ndft branches, to support any Ndown <= Ndft. The input data shifts in per Ndown + samples, so it appears in different branches when Ndown < Ndft. Therefore the input data cannot be FIR filtered + per branch for the whole input x. Instead it needs to be FIR filtered per block of Ndown input samples from x, + using pfs.polyDelays in pfs.filter_block(). + + . see downsample() + + Input: + . x: Input signal x[n] + . Ndown: downsample factor + . k: Index of BPF center frequency w_k = 2 pi k / Ndft + . Ndft: DFT size, number of polyphases in FIR structure + . coefs: prototype FIR filter coefficients for anti aliasing BPF + - verbosity: when > 0 print() status, else no print() + Return: + . y: Downsampled and down converted output signal y[m], m = n // D for bin + k. Complex baseband signal. + """ + a = [1.0] # FIR b = coefs, a = 1 + + # Polyphase implementation to avoid calculating values that are removed + # coefs = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] + # polyCoefs = [[ 3, 7, 11], + # [ 2, 6, 10], + # [ 1, 5, 9], + # [ 0, 4, 8]]) + pfs = PolyPhaseFirFilterStructure(Ndft, coefs, commutator='down') + polyCoefs = pfs.polyCoefs + + # Poly phases for whole data signal x, prepended with Ndown - 1 zeros + polyX, Nx, Nxp = poly_structure_data_for_downsampling_whole_x(x, Ndown) # size (Ndown, Nxp), Ny = Nxp + + # Filter x per polyphase, order in polyCoefs accounts for commutator [HARRIS Fig 6.12, 6.13] + polyY = np.zeros((Ndown, Nxp)) + for p in range(Ndown): + polyY[p] = signal.lfilter(polyCoefs[p], a, polyX[p]) + + # Phase rotate per polyphase, due to delay line at branch inputs [HARRIS Eq 6.8] + # . polyY can use index p, because order in polyY accounts for commutator, + # . phase rotator needs to use pCommutator to account for commutator, to fit + # order in polyY and polyCoefs + polyYC = np.zeros((Ndown, Nxp), dtype='cfloat') + for p in range(Ndown): + pCommutator = Ndown - 1 - p + polyYC[p] = polyY[p] * np.exp(1j * 2 * np.pi * pCommutator * k / Ndown) + + # Sum the branch outputs to get single downsampled and downconverted output value + y = np.sum(polyYC, axis=0) + + if verbosity: + print('> downsample_bpf():') + print('. len(x) =', str(len(x))) + print('. Ndown =', str(Ndown)) + print('. Nx =', str(Nx)) + print('. Nxp =', str(Nxp)) + print('. len(y) =', str(len(y))) # = Nxp + print('. k =', str(k)) + print('') + return y + + def resample(x, Nup, Ndown, coefs, verify=False, verbosity=1): # interpolate and decimate by Nup / Ndown """Resample x by factor U / D = Nup / Ndown