diff --git a/applications/lofar2/model/rtdsp/multirate.py b/applications/lofar2/model/rtdsp/multirate.py index 1a8fd12998ffa92b3551d62f28dbc45a70ee41ea..4f58598b8754ea4780bf8a8b1088aa5ba9ef4c0c 100644 --- a/applications/lofar2/model/rtdsp/multirate.py +++ b/applications/lofar2/model/rtdsp/multirate.py @@ -95,7 +95,7 @@ class PolyPhaseFirFilterStructure: output samples for every input sample [HARRIS Fig 7.11, 7.12, CROCHIERE Fig 3.25]. - Downsampling uses the PFS as a Direct-Form FIR filter, where the - commutator selects the polyphases from phase Nphase - 1 downto 0 to sum + commutator selects the polyphases from phase Nphases - 1 downto 0 to sum samples from the past for one output sample [HARRIS Fig 6.12, 6.13, CROCHIERE Fig 3.29]. - In both cases the commutator rotates counter clockwise, because the PFS @@ -208,7 +208,7 @@ class PolyPhaseFirFilterStructure: def map_to_poly_delays(self, delayLine): self.polyDelays = delayLine.reshape((self.Ntaps, self.Nphases)).T - def shift_in_data(self, inData, flipped=True): + def shift_in_data(self, inData, flipped): """Shift block of data into the polyDelays structure. View polyDelays as delay line if L < Nphases. Shift in from the left @@ -238,33 +238,23 @@ class PolyPhaseFirFilterStructure: self.polyDelays = np.roll(self.polyDelays, 1, axis=1) self.polyDelays[:, 0] = xData - def filter_block(self, inData, sampling, flipped=True): - """Filter block of input data per polyphase for downsampling or for - upsampling. + def filter_block(self, inData, flipped): + """Filter block of inData per polyphase. Input: - . inData: block of input data for downsampling or single input data for - upsampling. - . sampling: 'down' or 'up' - - 'down': for downsampling inData[n] is a block of length L time - samples, with 1 <= L <= Nphases [HARRIS Fig. 6.12, 6.13]. The time - index n in inData[n] increments, so inData[0] is oldest data and - inData[-1] is newest data when flipped is False. - - 'up': for upsampling inData length L = 1 sample [HARRIS Fig. 7.11, - 7.12]. The inData is fanout to the Nphases. - . flipped: False then inData order is inData[n] for downsampling and - still needs to be flipped. If True then the inData is already - flipped. Not used for upsampling. + . inData: block of input data with length <= Nphases, time index n in + inData[n] increments, so inData[0] is oldest data and inData[-1] is + newest data. The inData block size is = 1 for upsampling or > 1 + and <= Nphases for downsampling. + . flipped: False then inData order is inData[n] with newest sample last + and still needs to be flipped. If True then the inData is already + flipped in time, so with newest sample first. Return: . pfsData: block of polyphase FIR filtered output data for Nphases, with pfsData[p] and p = 0:Nphases-1 from top to bottom. """ # Shift in one block of input data (1 <= len(inData) <= Nphases) - if sampling == 'down': - self.shift_in_data(inData, flipped) - else: # 'up' - inWires = inData * ones(self.Nphases, np.iscomplexobj(inData)) - self.shift_in_data(inWires, flipped=True) + self.shift_in_data(inData, flipped) # Apply FIR coefs per delay element zData = self.polyDelays * self.polyCoefs # Sum FIR taps per polyphase @@ -272,6 +262,22 @@ class PolyPhaseFirFilterStructure: # Output block of Nphases filtered output data return pfsData + def filter_up(self, inSample): + """Filter same input sample at each polyphase, to upsample it by factor + U = Nphases. + + Input: + . inSample: input sample that is applied to each polyphase + Return: + . pfsData: block of polyphase FIR filtered output data for Nphases, + with pfsData[p] and p = 0:Nphases-1 from top to bottom. The + pfsData[0] is the first and thus oldest and pfsData[-1] is the last + and thus newest interpolated data. + """ + inWires = inSample * ones(self.Nphases, np.iscomplexobj(inSample)) + pfsData = self.filter_block(inWires, flipped=True) + return pfsData + ############################################################################### # Up, down, resample low pass input signal @@ -641,7 +647,7 @@ def resample(x, Nup, Ndown, coefs, verify=False, verbosity=1): # interpolate an # Single bandpass channel up and downsampling and up and downconversion ############################################################################### -def unit_circle_loops_phasor_arr(k, N, sign=1): +def unit_circle_loops_phasor_arr(k, N, sign): """Return array of N phasors on k loops along the unit circle. Polyphase dependent phase offsets for bin k, when N = Ndown = Ndft [HARRIS @@ -709,7 +715,7 @@ def maximal_downsample_bpf(x, Ndown, k, coefs, verbosity=1): # Phase rotate per polyphase for bin k, due to delay line at branch inputs # [HARRIS Eq 6.8] - kPhasors = unit_circle_loops_phasor_arr(k, Ndown) + kPhasors = unit_circle_loops_phasor_arr(k, Ndown, 1) polyYc = np.zeros((Ndown, Nxp), dtype='cfloat') for p in range(Ndown): polyYc[p] = polyY[p] * kPhasors[p] # row = row * scalar @@ -760,8 +766,9 @@ def maximal_upsample_bpf(xBase, Nup, k, coefs, verbosity=1): polyY, Nx, Nxp = polyphase_frontend(xBase, Nup, coefs, 'up') # Phase rotate per polyphase for bin k, due to delay line at branch inputs - # [HARRIS Eq 7.8 = 6.8] - kPhasors = unit_circle_loops_phasor_arr(k, Nup) + # [HARRIS Eq 7.8 = 6.8, Fig 7.16], can be applied after the FIR filter, + # because the kPhasors per polyphase are constants. + kPhasors = unit_circle_loops_phasor_arr(k, Nup, 1) polyYc = np.zeros((Nup, Nxp), dtype='cfloat') for p in range(Nup): polyYc[p] = polyY[p] * kPhasors[p] # row = row * scalar @@ -770,7 +777,7 @@ def maximal_upsample_bpf(xBase, Nup, k, coefs, verbosity=1): yc = polyYc.T.reshape(1, Nup * Nx)[0] if verbosity: - print('> Log maximal_downsample_bpf():') + print('> Log maximal_upsample_bpf():') print(' . len(xBase) =', str(len(xBase))) print(' . Nx =', str(Nx)) print(' . Nxp =', str(Nxp)) @@ -822,7 +829,7 @@ def non_maximal_downsample_bpf(x, Ndown, k, Ndft, coefs, verbosity=1): # PFS with Ndft polyphases pfs = PolyPhaseFirFilterStructure(Ndft, coefs) - kPhasors = unit_circle_loops_phasor_arr(k, Ndft) # [HARRIS Eq 6.8] + kPhasors = unit_circle_loops_phasor_arr(k, Ndft, 1) # [HARRIS Eq 6.8] # Oversampling time shift compensation via frequency dependent phase shift tPhasors = time_shift_phasor_arr(k, Ndown, Ndft, Nblocks, -1) # [HARRIS Eq 9.3] @@ -830,12 +837,12 @@ def non_maximal_downsample_bpf(x, Ndown, k, Ndft, coefs, verbosity=1): for b in range(Nblocks): # Filter block inData = xBlocks[:, b] - pfsData = pfs.filter_block(inData, 'down', flipped=True) + pfsData = pfs.filter_block(inData, flipped=True) # Phase rotate polyphases for bin k - pfsBinData = pfsData * kPhasors * tPhasors[b] # [HARRIS Eq 6.8, 9.3] + pfsBinData = pfsData * kPhasors # [HARRIS Eq 6.8, 9.3] # Sum the polyphases to get single downsampled and downconverted output # value - yc[b] = np.sum(pfsBinData) + yc[b] = np.sum(pfsBinData) * tPhasors[b] if verbosity: print('> non_maximal_downsample_bpf():') @@ -867,7 +874,7 @@ def non_maximal_upsample_bpf(xBase, Nup, k, Ndft, coefs, verbosity=1): so it appears from different branches when Nup < Ndft and a new block is shifted out. Therefore the output data cannot be FIR filtered per branch for the whole input xBase. Instead it needs to be FIR filtered per block of - Nup output samples, using pfs.polyDelays in pfs.filter_block(). + Nup output samples, using pfs.polyDelays in pfs.filter_up(). Input: . xBase: Input equivalent baseband signal xBase[m] @@ -888,28 +895,21 @@ def non_maximal_upsample_bpf(xBase, Nup, k, Ndft, coefs, verbosity=1): polyYc = np.zeros((Nup, Nblocks), dtype='cfloat') # PFS with Ndft polyphases - pfs = PolyPhaseFirFilterStructure(Ndft, coefs, cmplx=True) - kPhasors = unit_circle_loops_phasor_arr(k, Ndft) # [HARRIS Eq 7.8] + pfsUp = PolyPhaseFirFilterStructure(Nup, coefs, cmplx=True) + kPhasors = unit_circle_loops_phasor_arr(k, Ndft, 1) # [HARRIS Eq 7.8] # Oversampling time shift compensation via frequency dependent phase shift tPhasors = time_shift_phasor_arr(k, Nup, Ndft, Nblocks, -1) # [HARRIS Eq 9.3] # Polyphase FIR filter input xBase - nLo = 0 for b in range(Nblocks): # Filter block - inData = xBase[b] - pfsData = Nup * pfs.filter_block(inData, 'up') + inSample = xBase[b] + pfsData = Nup * pfsUp.filter_up(inSample) # Phase rotate polyphases for bin k - pfsBinData = pfsData * kPhasors * tPhasors[b] # [HARRIS Eq 7.8, 9.3] + pfsBinData = pfsData * kPhasors[0:Nup] # Output Nup samples yc[n] for every input sample xBase[m] - nEnd = nLo + Nup - if nEnd <= Ndft: - outBinData = pfsBinData[nLo : nEnd] - else: - outBinData = np.concatenate((pfsBinData[nLo : Ndft], - pfsBinData[0 : nEnd - Ndft])) - nLo = nEnd % Ndft + outBinData = pfsBinData * tPhasors[b] polyYc[:, b] = outBinData yc = polyYc.T.reshape(1, Nup * Nblocks)[0]