diff --git a/applications/lofar2/model/rtdsp/multirate.py b/applications/lofar2/model/rtdsp/multirate.py index c3991477a4a4a85ed11847c57320a7676dc612c8..1a8fd12998ffa92b3551d62f28dbc45a70ee41ea 100644 --- a/applications/lofar2/model/rtdsp/multirate.py +++ b/applications/lofar2/model/rtdsp/multirate.py @@ -50,6 +50,10 @@ def zeros(shape, cmplx=False): return np.zeros(shape) +def ones(shape, cmplx=False): + return zeros(shape, cmplx) + 1 + + def down(x, D, phase=0): """Downsample x[n] by factor D, xD[m] = x[m D], m = n // D @@ -140,7 +144,7 @@ class PolyPhaseFirFilterStructure: delayLine = [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9,10,11] For polyDelays shift in from left-top (n = 15 is newest input sample) - and shift-out from right-bottom (n = 4 is oldest sample). + and shift out from right-bottom (n = 4 is oldest sample). v v polyDelays = [[0, 4, 8], ((15,11, 7), @@ -204,7 +208,7 @@ class PolyPhaseFirFilterStructure: def map_to_poly_delays(self, delayLine): self.polyDelays = delayLine.reshape((self.Ntaps, self.Nphases)).T - def shift_in_data(self, inData, flipped=False): + def shift_in_data(self, inData, flipped=True): """Shift block of data into the polyDelays structure. View polyDelays as delay line if L < Nphases. Shift in from the left @@ -234,22 +238,33 @@ class PolyPhaseFirFilterStructure: self.polyDelays = np.roll(self.polyDelays, 1, axis=1) self.polyDelays[:, 0] = xData - def filter_block(self, inData, flipped=False): - """Filter block of inData per polyphase. + def filter_block(self, inData, sampling, flipped=True): + """Filter block of input data per polyphase for downsampling or for + upsampling. Input: - . inData: block of input data with length <= Nphases, time index n in - inData[n] increments, so inData[0] is oldest data and inData[-1] is - newest data. The inData block size is = 1 for upsampling or > 1 - and <= Nphases for downsampling. + . inData: block of input data for downsampling or single input data for + upsampling. + . sampling: 'down' or 'up' + - 'down': for downsampling inData[n] is a block of length L time + samples, with 1 <= L <= Nphases [HARRIS Fig. 6.12, 6.13]. The time + index n in inData[n] increments, so inData[0] is oldest data and + inData[-1] is newest data when flipped is False. + - 'up': for upsampling inData length L = 1 sample [HARRIS Fig. 7.11, + 7.12]. The inData is fanout to the Nphases. + . flipped: False then inData order is inData[n] for downsampling and + still needs to be flipped. If True then the inData is already + flipped. Not used for upsampling. Return: - . pfsData: block of polyphase FIR filtered output data for Nphase, with + . pfsData: block of polyphase FIR filtered output data for Nphases, with pfsData[p] and p = 0:Nphases-1 from top to bottom. - . flipped: False then inData order is inData[n] and still needs to be - flipped. If True then the inData is already flipped. """ # Shift in one block of input data (1 <= len(inData) <= Nphases) - self.shift_in_data(inData, flipped) + if sampling == 'down': + self.shift_in_data(inData, flipped) + else: # 'up' + inWires = inData * ones(self.Nphases, np.iscomplexobj(inData)) + self.shift_in_data(inWires, flipped=True) # Apply FIR coefs per delay element zData = self.polyDelays * self.polyCoefs # Sum FIR taps per polyphase @@ -299,7 +314,7 @@ def polyphase_data_for_downsampling_whole_x(x, Ndown, Nzeros): # . Store data in time order per branch, so with oldest data left, to match # use with lfilter(). Note this differs from order of tap data in # pfs.polyDelays where newest data is left, because the block data is - # flipped by shift_in_data(), to match use in pfs.filter_block, so that + # flipped by shift_in_data(), to match use in pfs.filter_block(), so that # it can use pfs.polyDelays * pfs.polyCoefs to filter the block. # . The newest sample x[-1] has to be in top branch of polyX and the oldest # sample x[0] in bottom branch, to match branch order of 0:Ndown-1 from @@ -623,37 +638,45 @@ def resample(x, Nup, Ndown, coefs, verify=False, verbosity=1): # interpolate an ############################################################################### -# Single bandpass channel up and down sampling and up and down conversion +# Single bandpass channel up and downsampling and up and downconversion ############################################################################### -def unit_circle_loops_phasor_arr(k, N, sign): +def unit_circle_loops_phasor_arr(k, N, sign=1): """Return array of N phasors on k loops along the unit circle. Polyphase dependent phase offsets for bin k, when N = Ndown = Ndft [HARRIS Eq 6.8]. For k = 1 this yields the roots of unity. + Input: + . k: bin in range(N) + . N: number of phasors + . sign: +1 or -1 term in exp() Return: - . pArr: exp(+-j 2pi k / N) for k in 0 : N - 1 + . pArr: exp(sign 2j pi k / N) for k in 0 : N - 1 """ - if sign == 'positive': - pArr = np.array([np.exp(2j * np.pi * p * k / N) for p in range(N)]) - else: # 'negative' - pArr = np.array([np.exp(-2j * np.pi * p * k / N) for p in range(N)]) + pArr = np.array([np.exp(sign * 2j * np.pi * p * k / N) for p in range(N)]) return pArr -def time_shift_phasor_arr(k, Ndown, Ndft, Msamples): +def time_shift_phasor_arr(k, M, Ndft, Msamples, sign): """Return array of Msamples phasors in time to compensate for oversampling time shift. - The time shift due to downsampling causes a frequency component of - k * Ndown / Ndft. With oversampling Ndown < Ndft, and then after - downsampling there remains a frequency offset [HARRIS Eq 9.3]. + The time shift due to down or upsampling causes a frequency component of + k * M / Ndft. With oversampling M < Ndft, and then after down or upsampling + there remains a frequency offset that can be compensated by given by mArr + [HARRIS Eq 9.3]. + Input: + . k: Index of BPF center frequency w_k = 2 pi k / Ndft + . M: downsample or upsample factor + . Ndft: DFT size, number of bins and number of polyphases in PFS FIR filter + . Msamples: Requested number of output samples in time + . sign: +1 or -1 term in exp() Return: - . mArr = exp(2j pi k * Ndown / Ndft * m), for m in 0 : Msamples - 1 + . mArr = exp(sign 2j pi k * M / Ndft * m), for m in 0 : Msamples - 1 """ - mArr = np.exp(-2j * np.pi * k * Ndown / Ndft * np.arange(Msamples)) + mArr = np.exp(sign * 2j * np.pi * k * M / Ndft * np.arange(Msamples)) return mArr @@ -661,14 +684,15 @@ def maximal_downsample_bpf(x, Ndown, k, coefs, verbosity=1): """BPF x at bin k in range(Ndown) and downsample x by factor D = Ndown and Ndown = Ndft. - Implement maximal downsampling down converter for one bin (= critically + Implement maximal downsampling downconverter for one bin (= critically sampled) [HARRIS Fig 6.14]. The BPF is centered at w_k = 2pi k / Ndft, where Ndft is number of frequency bins, is DFT size. The downsampling is maximal so Ndown = Ndft. The polyphase structure has Nphases = Ndown branches, so the input x data that shifts in remains in each branch. Therefore each branch can be - FIR filtered independently for the whole input x using polyX. + FIR filtered independently for the whole input x using + polyphase_frontend(). Input: . x: Input signal x[n] @@ -677,7 +701,7 @@ def maximal_downsample_bpf(x, Ndown, k, coefs, verbosity=1): . coefs: prototype FIR filter coefficients for anti aliasing BPF - verbosity: when > 0 print() status, else no print() Return: - . yc: Downsampled and down converted output signal yc[m], m = n // D for + . yc: Downsampled and downconverted output signal yc[m], m = n // D for bin k. Complex baseband signal. """ # Polyphase FIR filter input x @@ -685,14 +709,14 @@ def maximal_downsample_bpf(x, Ndown, k, coefs, verbosity=1): # Phase rotate per polyphase for bin k, due to delay line at branch inputs # [HARRIS Eq 6.8] - kPhasors = unit_circle_loops_phasor_arr(k, Ndown, 'positive') - polyYC = np.zeros((Ndown, Nxp), dtype='cfloat') + kPhasors = unit_circle_loops_phasor_arr(k, Ndown) + polyYc = np.zeros((Ndown, Nxp), dtype='cfloat') for p in range(Ndown): - polyYC[p] = polyY[p] * kPhasors[p] # row = row * scalar + polyYc[p] = polyY[p] * kPhasors[p] # row = row * scalar # Sum the branch outputs to get single downsampled and downconverted output # complex baseband value yc. - yc = np.sum(polyYC, axis=0) + yc = np.sum(polyYc, axis=0) if verbosity: print('> Log maximal_downsample_bpf():') @@ -706,60 +730,65 @@ def maximal_downsample_bpf(x, Ndown, k, coefs, verbosity=1): return yc -def maximal_upsample_bpf(x, Nup, k, coefs, verbosity=1): - """BPF x at bin k in range(Ndft) and downsample x by factor D = Nup - = Ndft. +def maximal_upsample_bpf(xBase, Nup, k, coefs, verbosity=1): + """BPF xBase at bin k in range(Ndft), and upsample xBase by factor U = Nup = + Ndft. Implement maximal upsampling upconverter for one bin (= critically sampled) [HARRIS Fig. 7.16]. The BPF is centered at w_k = 2pi k / Ndft, where Ndft is number of - frequency bins, is DFT size. The upsampling is maximal so Nup = Ndft. - The polyphase structure has Nphases = Nup branches, so the input x - data that shifts in remains in each branch. Therefore each branch can be - FIR filtered independently for the whole input x using polyX. + frequency bins, is DFT size. The upsampling is maximal so Nup = Ndft. The + polyphase structure has Nphases = Nup branches, so the output yc data + shifts out from the same branch for each block of Nup samples. Therefore + each branch can be FIR filtered independently for the whole input xBase + using polyphase_frontend(). Input: - . x: Input signal x[n] + . xBase: Input equivalent baseband signal . Nup: upsample factor . k: Index of BPF center frequency w_k = 2 pi k / Nup . coefs: prototype FIR filter coefficients for anti aliasing and interpolating BPF - verbosity: when > 0 print() status, else no print() Return: - . yc: Upsampled and upconverted output signal yc[n], n = m D for - bin k. Complex signal, so positive frequencies only. + . yc: Upsampled and upconverted output signal yc[n], n = m U at + intermediate frequency (IF) of bin k. Complex positive frequencies + only signal. """ - # Polyphase FIR filter input x - polyY, Nx, Nxp = polyphase_frontend(x, Nup, coefs, 'up') + # Polyphase FIR filter input xBase + polyY, Nx, Nxp = polyphase_frontend(xBase, Nup, coefs, 'up') # Phase rotate per polyphase for bin k, due to delay line at branch inputs # [HARRIS Eq 7.8 = 6.8] - kPhasors = unit_circle_loops_phasor_arr(k, Nup, 'positive') - polyYC = np.zeros((Nup, Nxp), dtype='cfloat') + kPhasors = unit_circle_loops_phasor_arr(k, Nup) + polyYc = np.zeros((Nup, Nxp), dtype='cfloat') for p in range(Nup): - polyYC[p] = polyY[p] * kPhasors[p] # row = row * scalar + polyYc[p] = polyY[p] * kPhasors[p] # row = row * scalar - # Output Nup samples yc[m] for every input sample x[n] - yc = polyYC.T.reshape(1, Nup * Nx)[0] + # Output Nup samples yc[m] for every input sample xBase[n] + yc = polyYc.T.reshape(1, Nup * Nx)[0] if verbosity: print('> Log maximal_downsample_bpf():') - print(' . len(x) =', str(len(x))) - print(' . Nx =', str(Nx)) - print(' . Nxp =', str(Nxp)) - print(' . len(yc) =', str(len(yc))) # = Nxp - print(' . Nup =', str(Nup)) - print(' . k =', str(k)) + print(' . len(xBase) =', str(len(xBase))) + print(' . Nx =', str(Nx)) + print(' . Nxp =', str(Nxp)) + print(' . len(yc) =', str(len(yc))) # = Nxp + print(' . Nup =', str(Nup)) + print(' . k =', str(k)) print('') return yc def non_maximal_downsample_bpf(x, Ndown, k, Ndft, coefs, verbosity=1): - """BPF x at bin k in range(Ndown) and downsample x by factor D = Ndown + """BPF x at bin k in range(Ndown), and downsample x by factor D = Ndown. - Implement nonmaximal downsampling down converter for one bin, extend - [HARRIS Fig 6.14]. + Implement nonmaximal downsampling downconverter for one bin. The maximal + downsampling downconverter for one bin has the kPhasors per polyphase + branch of [HARRIS Eq. 6.8 and Fig 6.14]. For nonmaximal this needs to be + extended with the tPhasors per polyphase branch of [HARRIS Eq. 9.3, to + compensate for the oversampling time shift. The BPF is centered at w_k = 2pi k / Ndft, where Ndft is number of frequency bins, is DFT size. The polyphase FIR structure has Nphases = Ndft @@ -779,7 +808,7 @@ def non_maximal_downsample_bpf(x, Ndown, k, Ndft, coefs, verbosity=1): . coefs: prototype LPF FIR filter coefficients for anti aliasing BPF - verbosity: when > 0 print() status, else no print() Return: - . yc: Downsampled and down converted output signal yc[m], m = n // D for + . yc: Downsampled and downconverted output signal yc[m], m = n // D for bin k. Complex baseband signal. """ # Prepend x with Ndown - 1 zeros, and represent x in Nblocks of Ndown @@ -793,15 +822,15 @@ def non_maximal_downsample_bpf(x, Ndown, k, Ndft, coefs, verbosity=1): # PFS with Ndft polyphases pfs = PolyPhaseFirFilterStructure(Ndft, coefs) - kPhasors = unit_circle_loops_phasor_arr(k, Ndft, 'positive') # [HARRIS Eq 6.8] + kPhasors = unit_circle_loops_phasor_arr(k, Ndft) # [HARRIS Eq 6.8] # Oversampling time shift compensation via frequency dependent phase shift - tPhasors = time_shift_phasor_arr(k, Ndown, Ndft, Nblocks) # [HARRIS Eq 9.3] + tPhasors = time_shift_phasor_arr(k, Ndown, Ndft, Nblocks, -1) # [HARRIS Eq 9.3] for b in range(Nblocks): # Filter block inData = xBlocks[:, b] - pfsData = pfs.filter_block(inData, flipped=True) + pfsData = pfs.filter_block(inData, 'down', flipped=True) # Phase rotate polyphases for bin k pfsBinData = pfsData * kPhasors * tPhasors[b] # [HARRIS Eq 6.8, 9.3] # Sum the polyphases to get single downsampled and downconverted output @@ -819,3 +848,79 @@ def non_maximal_downsample_bpf(x, Ndown, k, Ndft, coefs, verbosity=1): print(' . k =', str(k)) print('') return yc + + +def non_maximal_upsample_bpf(xBase, Nup, k, Ndft, coefs, verbosity=1): + """BPF xBase at bin k in range(Nup), and upsample xBase by factor U = Nup. + + Implement nonmaximal upsampling upconverter for one bin. The maximal + upsampling upconverter for one bin has the kPhasors per polyphase + branch similar of [HARRIS Fig 7.16]. For nonmaximal this needs to be + extended with the tPhasors per polyphase branch similar as for down in + [HARRIS Eq. 9.3], to compensate for the oversampling time shift. + + The BPF is centered at w_k = 2pi k / Ndft, where Ndft is number of + frequency bins, is DFT size. The polyphase FIR structure has Nphases = Ndft + branches, to fit the requested number of bins. The polyphase FIR structure + is maximally upsampled (= critically sampled) for Nup = Ndft, but it + can support any Nup <= Ndft. The output data shifts out per Nup samples, + so it appears from different branches when Nup < Ndft and a new block is + shifted out. Therefore the output data cannot be FIR filtered per branch + for the whole input xBase. Instead it needs to be FIR filtered per block of + Nup output samples, using pfs.polyDelays in pfs.filter_block(). + + Input: + . xBase: Input equivalent baseband signal xBase[m] + . Nup: upsample factor + . k: Index of BPF center frequency w_k = 2 pi k / Ndft + . Ndft: DFT size, number of polyphases in PFS FIR filter + . coefs: prototype LPF FIR filter coefficients for anti aliasing and + interpolationBPF + - verbosity: when > 0 print() status, else no print() + Return: + . yc: Upsampled and upconverted output signal yc[n], n = m U at + intermediate frequency (IF) of bin k. Complex positive frequencies + only signal. + """ + Nblocks = len(xBase) + + # Prepare output + polyYc = np.zeros((Nup, Nblocks), dtype='cfloat') + + # PFS with Ndft polyphases + pfs = PolyPhaseFirFilterStructure(Ndft, coefs, cmplx=True) + kPhasors = unit_circle_loops_phasor_arr(k, Ndft) # [HARRIS Eq 7.8] + + # Oversampling time shift compensation via frequency dependent phase shift + tPhasors = time_shift_phasor_arr(k, Nup, Ndft, Nblocks, -1) # [HARRIS Eq 9.3] + + # Polyphase FIR filter input xBase + nLo = 0 + for b in range(Nblocks): + # Filter block + inData = xBase[b] + pfsData = Nup * pfs.filter_block(inData, 'up') + # Phase rotate polyphases for bin k + pfsBinData = pfsData * kPhasors * tPhasors[b] # [HARRIS Eq 7.8, 9.3] + # Output Nup samples yc[n] for every input sample xBase[m] + nEnd = nLo + Nup + if nEnd <= Ndft: + outBinData = pfsBinData[nLo : nEnd] + else: + outBinData = np.concatenate((pfsBinData[nLo : Ndft], + pfsBinData[0 : nEnd - Ndft])) + nLo = nEnd % Ndft + polyYc[:, b] = outBinData + + yc = polyYc.T.reshape(1, Nup * Nblocks)[0] + + if verbosity: + print('> non_maximal_upsample_bpf():') + print(' . len(xBase) =', str(len(xBase))) + print(' . Nblocks =', str(Nblocks)) + print(' . len(yc) =', str(len(yc))) # = Nblocks + print(' . Nup =', str(Nup)) + print(' . Ndft =', str(Ndft)) + print(' . k =', str(k)) + print('') + return yc