diff --git a/applications/lofar2/model/rtdsp/multirate.py b/applications/lofar2/model/rtdsp/multirate.py index e42f426554b74eaa0da1d5fcf7365e7cb6f0925d..979e1662921bc336e4a672a835e9cd978506c38a 100644 --- a/applications/lofar2/model/rtdsp/multirate.py +++ b/applications/lofar2/model/rtdsp/multirate.py @@ -180,6 +180,7 @@ class PolyPhaseFirFilterStructure: self.Ncoefs = len(coefs) self.Nphases = Nphases # branches, rows self.Ntaps = ceil_div(self.Ncoefs, Nphases) # taps, columns + self.Ndelays = self.Ntaps * self.Nphases # zero padded coefs self.cmplx = cmplx self.init_poly_coeffs() self.reset_poly_delays() @@ -205,9 +206,9 @@ class PolyPhaseFirFilterStructure: Nphases = 4 rows (axis=0) Ntaps = 3 columns (axis=1) """ - polyCoefs = np.zeros(self.Nphases * self.Ntaps) # real - polyCoefs[0 : self.Ncoefs] = self.coefs - self.polyCoefs = polyCoefs.reshape((self.Ntaps, self.Nphases)).T + coefsLine = np.zeros(self.Ndelays) # real + coefsLine[0 : self.Ncoefs] = self.coefs + self.polyCoefs = coefsLine.reshape((self.Ntaps, self.Nphases)).T def reset_poly_delays(self): self.polyDelays = zeros((self.Nphases, self.Ntaps), self.cmplx) @@ -239,7 +240,7 @@ class PolyPhaseFirFilterStructure: """ L = len(inData) xData = inData if flipped else np.flip(inData) - if L < self.Nphases: + if L != self.Nphases: delayLine = self.map_to_delay_line() # Equivalent code: # delayLine = np.concatenate((inData, delayLine[L:])) @@ -252,7 +253,7 @@ class PolyPhaseFirFilterStructure: self.polyDelays = np.roll(self.polyDelays, 1, axis=1) self.polyDelays[:, 0] = xData - def load_in_data(self, inData, flipped): + def parallel_load(self, inData, flipped): """Load block of data into each tap of the polyDelays structure. Length L of inData is L = Nphases, so load the data directly at @@ -272,11 +273,19 @@ class PolyPhaseFirFilterStructure: def filter_block(self, inData, flipped): """Filter block of inData per polyphase. + For upsampling the inData should contain Nphases = Nup copies of the + input time sample. + For downsampling the inData should contain Nphases = Ndown input time + samples. + For a single channel downconverter or analysis DFT filterbank the + inData contains Ndown input time samples, with Ndown <= Nphases = Ndft. + For a single channel upconverter or synthesis DFT filterbank the + inData contains Ndft time samples, with Ndft >= Nphases = Nup. + Input: - . inData: block of input data with length <= Nphases, time index n in + . inData: block of input data with length L. The time index n in inData[n] increments, so inData[0] is oldest data and inData[-1] is - newest data. The inData block size is = 1 for upsampling or > 1 - and <= Nphases for downsampling. + newest data. . flipped: False then inData order is inData[n] with newest sample last and still needs to be flipped. If True then the inData is already flipped in time, so with newest sample at inData[0]. @@ -284,31 +293,15 @@ class PolyPhaseFirFilterStructure: . pfsData: block of polyphase FIR filtered output data for Nphases, with pfsData[p] and p = 0:Nphases-1 from top to bottom. """ - # Shift in one block of input data (1 <= len(inData) <= Nphases) + # Shift in one block of input data self.shift_in_data(inData, flipped) # Apply FIR coefs per delay element - zData = self.polyDelays * self.polyCoefs + zPoly = self.polyDelays * self.polyCoefs # Sum FIR taps per polyphase - pfsData = np.sum(zData, axis=1) + pfsData = np.sum(zPoly, axis=1) # Output block of Nphases filtered output data return pfsData - def filter_up(self, inSample): - """Filter same input sample at each polyphase, to upsample it by factor - U = Nphases. - - Input: - . inSample: input sample that is applied to each polyphase - Return: - . pfsData: block of polyphase FIR filtered output data for Nphases, - with pfsData[p] and p = 0:Nphases-1 from top to bottom. The - pfsData[0] is the first and thus oldest and pfsData[-1] is the last - and thus newest interpolated data. - """ - inWires = inSample * ones(self.Nphases, np.iscomplexobj(inSample)) - pfsData = self.filter_block(inWires, flipped=True) - return pfsData - ############################################################################### # Up, down, resample low pass input signal @@ -332,6 +325,7 @@ def polyphase_data_for_downsampling_whole_x(x, Ndown, Nzeros): Return: . polyX: polyphase data structure with size (Ndown, Nxp) for Ndown branches and Nxp samples from x per branch. + . lineX: Nx samples from x prepended with Nzeros, same content as polyX . Nx: Total number of samples from x, including prepended Ndown - 1 zeros. . Nxp: Total number of samples used from x per polyphase branch, is the number of samples Ny, that will be in downsampled output y[m] = x[m D], @@ -345,8 +339,8 @@ def polyphase_data_for_downsampling_whole_x(x, Ndown, Nzeros): # . prepend x with Nzeros zeros # . skip any last remaining samples from x, that are not enough yield a new # output FIR sum. - polyX = zeros(Ndown * Nxp, np.iscomplexobj(x)) - polyX[Nzeros:] = x[0 : Nx] + lineX = zeros(Ndown * Nxp, np.iscomplexobj(x)) + lineX[Nzeros:] = x[0 : Nx] # . Store data in time order per branch, so with oldest data left, to match # use with lfilter(). Note this differs from order of tap data in # pfs.polyDelays where newest data is left, because the block data is @@ -369,8 +363,8 @@ def polyphase_data_for_downsampling_whole_x(x, Ndown, Nzeros): # [3, 7,11]] (12, 8, 4)) (0, 4, 8,12)) # v | # oldest - polyX = np.flipud(polyX.reshape(Nxp, Ndown).T) - return polyX, Nx, Nxp + polyX = np.flipud(lineX.reshape(Nxp, Ndown).T) + return polyX, lineX, Nx, Nxp def polyphase_frontend(x, Nphases, coefs, sampling): @@ -423,7 +417,7 @@ def polyphase_frontend(x, Nphases, coefs, sampling): # length of each branch. Ndown = Nphases Nzeros = Ndown - 1 - polyX, Nx, Nxp = polyphase_data_for_downsampling_whole_x(x, Ndown, Nzeros) + polyX, _, Nx, Nxp = polyphase_data_for_downsampling_whole_x(x, Ndown, Nzeros) # print(polyX[:, 0]) # Filter Ndown parts of x per polyphase, because the FIR filter output # y will sum. The commutator index order for downsampling is p = @@ -859,7 +853,7 @@ def non_maximal_downsample_bpf(x, Ndown, k, Ndft, coefs, verbosity=1): # Prepend x with Ndown - 1 zeros, and represent x in Nblocks of Ndown samples Nzeros = Ndown - 1 - xBlocks, Nx, Nblocks = polyphase_data_for_downsampling_whole_x(x, Ndown, Nzeros) + xBlocks, _, Nx, Nblocks = polyphase_data_for_downsampling_whole_x(x, Ndown, Nzeros) # Prepare output yc = np.zeros(Nblocks, dtype='cfloat') @@ -935,38 +929,39 @@ def non_maximal_upsample_bpf(xBase, Nup, k, Ndft, coefs, verbosity=1): only signal. """ Ros = Ndft // Nup - if Ndft % Nup: - print('WARNING: Only support integer Ros') - return None + # if Ndft % Nup: + # print('WARNING: Only support integer Ros') + # return None Nblocks = len(xBase) # Prepare output polyYc = np.zeros((Nup, Nblocks), dtype='cfloat') - # PFS with Ndft polyphases - pfs = PolyPhaseFirFilterStructure(Ndft, coefs, cmplx=True) - kPhasors = unit_circle_loops_phasor_arr(k, Ndft, 1) # [HARRIS Eq 7.8] + # Adjust LO phase for group delay of analysis LPF and synthesis LPF in + # series. This is not needed in practise, but is needed to be able to + # exactly compare reconstructed signal with original input time series + # signal. Assume coefs has group delay (len(coefs) - 1) / 2. + hPairGroupDelay = len(coefs) - 1 + hPhasor = np.exp(-2j * np.pi * k / Ndft * hPairGroupDelay) # Oversampling time shift compensation via frequency dependent phase shift tPhasors = time_shift_phasor_arr(k, Nup, Ndft, Nblocks, -1) # [HARRIS Eq 9.3] + # PFS with Ndft polyphases + pfs = PolyPhaseFirFilterStructure(Ndft, coefs, cmplx=True) + kPhasors = unit_circle_loops_phasor_arr(k, Ndft, 1) # [HARRIS Eq 7.8] + # Polyphase FIR filter input xBase for b in range(Nblocks): - # Filter block - inSample = xBase[b] - pfsData = Nup * pfs.filter_up(inSample) # Phase rotate polyphases for bin k - pfsBinData = pfsData * kPhasors - # Sum Ros interleaved rows to get Nup rows [HARRIS Fig 9.15, - # CROCHIERE Fig 7.16]] - pfsUpData = pfsBinData.reshape((Ros, Nup)).T - # pfsUpData = pfsBinData.reshape((Nup, Ros)) - pfsUpData = np.sum(pfsUpData, axis=1) - # pfsUpData = Ros * pfsUpData[:, 0] + inData = xBase[b] * hPhasor * tPhasors[b] * kPhasors + + # Filter block [HARRIS Fig 9.15, CROCHIERE Fig 7.16]] + pfsData = pfs.filter_block(inData, flipped=True) + # Output Nup samples yc[n] for every input sample xBase[m] - outUpData = pfsUpData * tPhasors[b] - polyYc[:, b] = outUpData + polyYc[:, b] = Ndft * pfsData yc = polyYc.T.reshape(1, Nup * Nblocks)[0] @@ -1030,7 +1025,7 @@ def analysis_dft_filterbank(x, Ndown, Ndft, coefs, commutator, verbosity=1): # samples Nzeros = Ndown - 1 # Nzeros = 0 - xBlocks, Nx, Nblocks = polyphase_data_for_downsampling_whole_x(x, Ndown, Nzeros) + xBlocks, xData, Nx, Nblocks = polyphase_data_for_downsampling_whole_x(x, Ndown, Nzeros) # Prepare output Yc = np.zeros((Ndft, Nblocks), dtype='cfloat') @@ -1038,25 +1033,53 @@ def analysis_dft_filterbank(x, Ndown, Ndft, coefs, commutator, verbosity=1): # PFS with Ndft polyphases pfs = PolyPhaseFirFilterStructure(Ndft, coefs) + wola = True + # wola = False # Oversampling DFT filterbank - for b in range(Nblocks): - # Filter block - inData = xBlocks[:, b] - pfsData = pfs.filter_block(inData, flipped=True) - - # Apply time (m) and bin (k) dependend phase shift W_K^(kmM) by circular - # time shift of DFT input - tShift = (b * Ndown) % Ndft - pfsShifted = np.roll(pfsData, -tShift) - - # For 'ccw' apply IDFT, for 'cw' apply DFT - if commutator == 'cw': # DFT - # For 'cw' fold the pfsData to apply the DFT - pfsShifted = fold(pfsShifted) - Yc[:, b] = np.fft.fft(pfsShifted) - else: # 'ccw', IDFT - # With 'ccw' keep the pfsData to apply IDFT + if wola: + # WOLA structure + # . prepend with zeros to aligne with PFS implementation + xData = np.concatenate((np.zeros(pfs.Ndelays - Ndown), xData)) + xData = np.concatenate((xData, np.zeros(Ndft))) + print(pfs.Ndelays) + for b in range(Nblocks): + # Window + tRange = np.arange(pfs.Ndelays) + b * Ndown + pfs.shift_in_data(xData[tRange], flipped=False) + # print(pfs.polyDelays) + zPoly = pfs.polyDelays * pfs.polyCoefs + # Sum pfs.Ntaps number of blocks + pfsData = np.sum(zPoly, axis=1) + # Time shift + tShift = (b * Ndown - 0) % Ndft + pfsShifted = np.roll(pfsData, -tShift) + # DFT + # pfsShifted = fold(pfsShifted) + # pfsShifted = np.flip(pfsShifted) + # Yc[:, b] = np.fft.fft(pfsShifted) Yc[:, b] = np.fft.ifft(pfsShifted) * Ndft + else: + # PFS + for b in range(Nblocks): + # Filter block + inData = xBlocks[:, b] + pfsData = pfs.filter_block(inData, flipped=True) + print(pfs.polyDelays) + + # Time (m) and bin (k) dependend phase shift W_K^(kmM) by circular + # time shift of DFT input + tShift = (b * Ndown) % Ndft + pfsShifted = np.roll(pfsData, -tShift) + + # For 'ccw' apply IDFT, for 'cw' apply DFT(fold()), due to fold() both + # yield identical result + if commutator == 'cw': # DFT + # For 'cw' fold the pfsData to apply the DFT + pfsShifted = fold(pfsShifted) + Yc[:, b] = np.fft.fft(pfsShifted) + else: # 'ccw', IDFT + # With 'ccw' keep the pfsData to apply IDFT + Yc[:, b] = np.fft.ifft(pfsShifted) * Ndft if verbosity: print('> analysis_dft_filterbank():') @@ -1095,11 +1118,11 @@ def synthesis_dft_filterbank(Xbase, Nup, Ndft, coefs, commutator, verbosity=1): # Xbase has Ndft rows and Nblocks columns Nblocks = np.size(Xbase, axis=1) - # PFS with Ndft polyphases + # Use PFS with Ndft polyphases for delay line and window pfs = PolyPhaseFirFilterStructure(Ndft, coefs) # Prepare output - Noutput = Nup * Nblocks + pfs.Ncoefs + Noutput = Nup * Nblocks + pfs.Ndelays y = np.zeros(Noutput, dtype='float') # Adjust LO phase for group delay of analysis LPF and synthesis LPF in @@ -1110,7 +1133,8 @@ def synthesis_dft_filterbank(Xbase, Nup, Ndft, coefs, commutator, verbosity=1): # Oversampling DFT filterbank for b in range(Nblocks): - # For 'ccw' apply DFT, for 'cw' apply IDFT + # For 'ccw' apply IDFT, for 'cw' apply DFT(fold()), due to fold() both + # yield identical result if commutator == 'cw': # DFT # For 'cw' fold the Xbase to apply the DFT xTime = np.real(np.fft.fft(fold(Xbase[:, b]))) @@ -1124,15 +1148,15 @@ def synthesis_dft_filterbank(Xbase, Nup, Ndft, coefs, commutator, verbosity=1): xShifted = np.roll(xTime, -tShift) # Load xTime at each tap in polyDelays - pfs.load_in_data(xShifted, flipped=True) + pfs.parallel_load(xShifted, flipped=True) # Apply FIR coefs per delay element zPoly = Nup * pfs.polyDelays * pfs.polyCoefs - zData = pfs.map_to_delay_line(zPoly) + zLine = pfs.map_to_delay_line(zPoly) # Overlap add weigthed input to the output - tRange = np.arange(len(zData)) + b * Nup - y[tRange] += zData + tRange = np.arange(pfs.Ndelays) + b * Nup + y[tRange] += zLine if verbosity: print('> synthesis_dft_filterbank():')