diff --git a/libraries/dsp/wpfb/tb/python/tc_mmf_wpfb_unit.py b/libraries/dsp/wpfb/tb/python/tc_mmf_wpfb_unit.py new file mode 100644 index 0000000000000000000000000000000000000000..340ddee800b95d05db1c363666bb4c92dc25651b --- /dev/null +++ b/libraries/dsp/wpfb/tb/python/tc_mmf_wpfb_unit.py @@ -0,0 +1,686 @@ +#! /usr/bin/env python +############################################################################### +# +# Copyright (C) 2012 +# ASTRON (Netherlands Institute for Radio Astronomy) <http://www.astron.nl/> +# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# +############################################################################### + +"""Test case for the wpfb_unit entity. + + This test case verifies the quality of the wideband poly phase filterbank. + + The testcase can be runned automatically via the wpfb_unit.py script as part + of a regression test: + + > wpfb_unit.py --hold + + It is also possible to run this manually within modelsim. Take care that the + generics of python and vhdl correspond to eachother. + Within modelsim: + > lp wpfb (this will load th wpfb library) + - Double click the tb_mmf_wpfb_unit simulation configuration. + > as 10 + > run -all + + Then in a separate console run this script as: + > python tc_mmf_wpfb_unit.py --unb 0 --bn 0 + +""" + +############################################################################### +# System imports +import test_case +import node_io +import unb_apertif as apr +import pi_diag_block_gen +import pi_diag_data_buffer +import pi_fil_ppf_w +import dsp_test +import sys, os +import subprocess +import pylab as pl +import numpy as np +import scipy as sp +import random +from tools import * +from common import * + +# Create a test case object +tc = test_case.Testcase('TB - ', '') + +# Constants/Generics that are shared between VHDL and Python +# Name Value Default Description +# START_VHDL_GENERICS +g_wb_factor = 1 # The Wideband factor (must be power of 2) +g_nof_chan = 0 # The exponent (for 2) that defines the number of time-multiplexed input channels +g_nof_wb_streams = 1 # The number of parallel streams +g_nof_points = 64 # The size of the FFT +g_nof_taps = 16 # The number of taps in the filter +g_in_dat_w = 8 # Input width of the FFT +g_out_dat_w = 16 # Output width of the FFT +g_use_separate = False # When False: FFT input is considered Complex. When True: FFT input is 2xReal +g_nof_blocks = 4 # The number of FFT blocks that are simulated per BG period # (must be >= 1 and power of 2 due to that BG c_bg_ram_size must be > 1 and power of 2 to avoid unused addresses) +# END_VHDL_GENERICS + +# Overwrite generics with argumented generics from autoscript or command line. +if tc.generics != None: + g_wb_factor = tc.generics['g_wb_factor'] + g_nof_wb_streams = tc.generics['g_nof_wb_streams'] + g_nof_chan = tc.generics['g_nof_chan'] + g_nof_points = tc.generics['g_nof_points'] + g_nof_taps = tc.generics['g_nof_taps'] + g_in_dat_w = tc.generics['g_in_dat_w'] + g_out_dat_w = tc.generics['g_out_dat_w'] + g_use_separate = tc.generics['g_use_separate'] + g_nof_blocks = tc.generics['g_nof_blocks'] + +tc.append_log(3, '>>>') +tc.append_log(1, '>>> Title : Test bench for wpfb_unit entity with %d points and %d taps' % (g_nof_points, g_nof_taps)) +tc.append_log(3, '>>>') +tc.append_log(3, '') +tc.set_result('PASSED') + +# Check whether the generics combination can be handled +if g_nof_chan != 0: + g_nof_chan = 0 + tc.append_log(2, 'WARNING: Forced g_nof_chan=0 because multiple channels are not yet supported in this testcase ') + +c_fft_type = "wide" + +tc.append_log(3, '') +tc.append_log(3, '>>> VHDL generic settings:') +tc.append_log(3, '') +tc.append_log(3, ' g_nof_chan = %d' % g_nof_chan ) +tc.append_log(3, ' g_nof_wb_streams = %d' % g_nof_wb_streams ) +tc.append_log(3, ' g_wb_factor = %d' % g_wb_factor ) +tc.append_log(3, ' g_nof_points = %d' % g_nof_points ) +tc.append_log(3, ' g_nof_taps = %d' % g_nof_taps ) +tc.append_log(3, ' g_in_dat_w = %d' % g_in_dat_w ) +tc.append_log(3, ' g_out_dat_w = %d' % g_out_dat_w ) +tc.append_log(3, ' g_use_separate = %s' % g_use_separate) +tc.append_log(3, ' g_nof_blocks = %d' % g_nof_blocks ) +tc.append_log(3, '') + +c_full_scale = 2**(g_in_dat_w-1)-1 + +c_real_sigma = 1.0 # standard deviation (sigma) defined as nof in_dat quantization steps +c_imag_sigma = 2.0 # standard deviation (sigma) defined as nof in_dat quantization steps + +# FFT scaling +c_fft_float_gain = g_nof_points +c_fft_vhdl_gain = 2**(g_out_dat_w - g_in_dat_w) +c_fft_scale = 1.0 * c_fft_float_gain / c_fft_vhdl_gain + +# Python waveform constants +c_real_select = 'sine' # select waveform: 'sine', 'square', 'impulse', 'noise', 'gaussian', 'zero' (default) +c_real_ampl = 0.8 # normalized full scale amplitude for sine, square, impulse and noise input +c_real_phase = 0.0 # sine phase in degrees +c_real_freq = 2 # nof periods per g_nof_points +c_real_index = 1 # time index within g_nof_points +c_real_noiselevel = 1.0 # added ADC noise level in units of 1 bit +c_real_seed = 1 # random seed for noise signal + +c_imag_select = 'gaussian' # select waveform: 'sine', 'square', 'impulse', 'noise', 'gaussian', 'zero' (default) +c_imag_ampl = 1.0*c_imag_sigma/c_full_scale +c_imag_ampl = 0.0 # ... comment this line to enable gaussian +c_imag_phase = 45.0 +c_imag_freq = 5 +c_imag_index = 1 +c_imag_noiselevel = 0.0 +c_imag_seed = 2 + +tc.append_log(3, '') +tc.append_log(3, '>>> Waveform settings:') +tc.append_log(3, '') +tc.append_log(3, ' c_real_select = %s' % c_real_select) +tc.append_log(3, ' c_real_sigma = %7.3f' % c_real_sigma) +tc.append_log(3, ' c_real_ampl = %7.3f' % c_real_ampl) +tc.append_log(3, ' c_real_phase = %7.3f' % c_real_phase) +tc.append_log(3, ' c_real_freq = %7.3f' % c_real_freq) +tc.append_log(3, ' c_real_index = %d' % c_real_index) +tc.append_log(3, ' c_real_noiselevel = %7.3f' % c_real_noiselevel) +tc.append_log(3, ' c_real_seed = %7.3f' % c_real_seed) +tc.append_log(3, '') +tc.append_log(3, ' c_imag_select = %s' % c_imag_select) +tc.append_log(3, ' c_imag_sigma = %7.3f' % c_imag_sigma) +tc.append_log(3, ' c_imag_ampl = %7.3f' % c_imag_ampl) +tc.append_log(3, ' c_imag_phase = %7.3f' % c_imag_phase) +tc.append_log(3, ' c_imag_freq = %7.3f' % c_imag_freq) +tc.append_log(3, ' c_imag_index = %d' % c_imag_index) +tc.append_log(3, ' c_imag_noiselevel = %7.3f' % c_imag_noiselevel) +tc.append_log(3, ' c_imag_seed = %7.3f' % c_imag_seed) +tc.append_log(3, '') + +# Python specific constants +c_sample_freq_mhz = 800 +c_dsp_clk_period_ns = 5 +g_nof_wb_streams = g_wb_factor +c_nof_input_channels = 2**g_nof_chan +c_frame_size = g_nof_points/g_wb_factor +c_nof_integrations = g_nof_blocks # The number of spectrums that are averaged, must be <= g_nof_blocks +c_channel_length = g_nof_blocks*g_nof_points # number of time series samples per input channel +c_bg_ram_size = c_nof_input_channels*g_nof_blocks*c_frame_size +c_stimuli_length = c_nof_input_channels*c_channel_length # total number of time series samples for the BG that generates all channels +c_integration_length = c_nof_integrations*g_nof_points # number of time series samples per input channel that are used for integration + +c_blocks_per_sync = g_nof_taps+4 # choose sync interval little bit longer than the WPFB impulse response +c_nof_input_signals = 1 # Represents the real and imaginary input or input A and B +c_coefs_input_file ="../../../../modules/Lofar/pfs/src/data/Coeffs16384Kaiser-quant.dat" +c_nof_coefs_in_file = 16384 +c_downsample_factor = c_nof_coefs_in_file/(g_nof_taps*g_nof_points) # use Coeffs16384Kaiser-quant.dat as lookup table in case c_downsample_factor != 1 +c_write_coefs = 1 +c_write_ones = 0 +c_write_to_file = 0 +c_plot_enable = 1 +c_max_impl_loss_dB = 1 # Maximum allowed implementation loss in dB's + +#c_subband_period_ns = g_nof_points*1e3/c_sample_freq_mhz # Subband period in ns = BSN block time in ns +c_subband_period_ns = g_nof_points/g_wb_factor*c_dsp_clk_period_ns # Subband period in ns = BSN block time in ns +c_wpfb_response_ns = c_subband_period_ns*g_nof_taps # WPFB response time in ns +c_sync_interval_ns = c_subband_period_ns*c_blocks_per_sync # sync interval in ns + +# Create access object for nodes +io = node_io.NodeIO(tc.nodeImages, tc.base_ip) + +# Create block generator instance +bg = pi_diag_block_gen.PiDiagBlockGen(tc, io, g_nof_wb_streams, ramSizePerChannel=c_bg_ram_size) + +# Create databuffer instances +db_re = pi_diag_data_buffer.PiDiagDataBuffer(tc, io, instanceName = 'REAL', nofStreams=g_nof_wb_streams, ramSizePerStream=c_stimuli_length/g_wb_factor) +db_im = pi_diag_data_buffer.PiDiagDataBuffer(tc, io, instanceName = 'IMAG', nofStreams=g_nof_wb_streams, ramSizePerStream=c_stimuli_length/g_wb_factor) + +# Create filter instance +fil = pi_fil_ppf_w.PiFilPpfw(tc, io, c_nof_input_signals, g_nof_taps, g_nof_points, g_wb_factor) + +# Create dsp_test instance for helpful methods +dsp_test = dsp_test.DspTest(g_nof_points, g_in_dat_w, g_out_dat_w) + +def gen_filter_hex_files(c_nof_taps = 4, c_wb_factor = 4, c_nof_points = 16): + # Create mif files for coefficient memories. + create_mifs_cmd = "python ../../../filter/src/python/create_mifs.py -t %d -w %d -b %d -o" % (c_nof_taps, c_wb_factor, c_nof_points) + cm = subprocess.Popen(create_mifs_cmd, cwd=r'../../../filter/src/python/', shell=True, close_fds=True) + return + +if __name__ == "__main__": + + ############################################################################### + # Generate the hex files for simulation. If hex files do not yet + # exist it is required to start this script twice. + ############################################################################### + gen_filter_hex_files(g_nof_taps, g_wb_factor, g_nof_points) + + ############################################################################### + # + # Write filtercoefficients to memory of the Wideband Poly Phase Filter Bank: + # + ############################################################################### + if c_write_coefs == 1: + # Read all the coefficients from the file into a list. + coefs_list_from_file =[] + f = file(c_coefs_input_file, "r") + for i in range(c_nof_coefs_in_file): + s = int(f.readline()) + s = s & (2**apr.c_coefs_width-1) + coefs_list_from_file.append(s) + f.close() + + # Downsample the list to the number of required coefficients, based on the g_nof_taps and the c_fft_size + coefs_list = [] + for i in range(g_nof_taps*g_nof_points): + s = coefs_list_from_file[i*c_downsample_factor] + coefs_list.append(s) + + # Write the coefficients to the memories + all_taps = [] + if(c_write_ones == 0): + for l in range(c_nof_input_signals): + for k in range(g_wb_factor): + for j in range(g_nof_taps): + write_list=[] + for i in range(g_nof_points/g_wb_factor): + write_list.append(coefs_list[j*g_nof_points+i*g_wb_factor + g_wb_factor-1-k]) + write_list_rev = [] + for i in range(g_nof_points/g_wb_factor): # Reverse the list + write_list_rev.append(write_list[g_nof_points/g_wb_factor-i-1]) + fil.write_coefs(write_list_rev,l,j,k) + all_taps.append(write_list_rev) + coef_totals = [] +# for i in all_taps: +# print i +# for i in range(g_nof_points): +# coef_total = 0 +# for j in range(g_nof_taps): +# coef_total = coef_total + all_taps[j][i] +# coef_totals.append(coef_total) +# print coef_total + else: + all_ones = [] + all_zeros = [] + for i in range(g_nof_points/g_wb_factor): + all_ones.append(0x4000) + all_zeros.append(0x0) + + for l in range(c_nof_input_signals): + for k in range(g_wb_factor): + for j in range(g_nof_taps): + if(j==0): + fil.write_coefs(all_ones,l,j,k) + else: + fil.write_coefs(all_zeros,l,j,k) + + ############################################################################### + # + # Prepare input stimuli for Wideband Poly Phase Filter Bank + # + ############################################################################### + + # Prepare data time-series stimuli that can be indexed like : series[channel_nr][time_nr] of real and imag values or complex values, where time_nr is 0:c_channel_length-1 + + # Create a floating point real and imag input signal xf. + xf_real_series=[] + xf_imag_series=[] + for i in range(c_nof_input_channels): + xf_real_series.append(dsp_test.create_waveform(sel=c_real_select, ampl=c_real_ampl, phase=c_real_phase, freq=c_real_freq+i, timeIndex=c_real_index+i, seed=c_real_seed+i, noiseLevel=c_real_noiselevel, length=c_channel_length)) + xf_imag_series.append(dsp_test.create_waveform(sel=c_imag_select, ampl=c_imag_ampl, phase=c_imag_phase, freq=c_imag_freq+i, timeIndex=c_imag_index+i, seed=c_imag_seed+i, noiseLevel=c_imag_noiselevel, length=c_channel_length)) + # Quantize xf --> ADC --> xq] + xq_real_series=[] + xq_imag_series=[] + for i in range(c_nof_input_channels): + xq_real_series.append(dsp_test.adc_quantize_waveform(xf_real_series[i])) + xq_imag_series.append(dsp_test.adc_quantize_waveform(xf_imag_series[i])) + # Convert to complex array + xf_complex_series=[] + xq_complex_series=[] + for i in range(c_nof_input_channels): + xf_complex_series.append(dsp_test.to_complex_list(xf_real_series[i], xf_imag_series[i])) + xq_complex_series.append(dsp_test.to_complex_list(xq_real_series[i], xq_imag_series[i])) + + ############################################################################### + # + # Write the Wideband Poly Phase Filter Bank input stimuli to the BG + # + ############################################################################### + + # Prepare xq stimuli order for block generator : list[time_nr * channel_nr] + xq_real = [] + xq_imag = [] + for i in range(c_channel_length): + for j in range(c_nof_input_channels): + xq_real.append(xq_real_series[j][i]) + xq_imag.append(xq_imag_series[j][i]) + + bg_vhdl_data = dsp_test.concatenate_two_lists(xq_real, xq_imag, g_in_dat_w) + + # Write setting for the block generator: + bg.write_block_gen_settings(samplesPerPacket=c_frame_size, blocksPerSync=c_blocks_per_sync, gapSize=0, memLowAddr=0, memHighAddr=c_bg_ram_size-1, BSNInit=0) + + # Write the stimuli to the block generator and enable the block generator + for i in range(g_wb_factor): + send_data = [] + for j in range(c_stimuli_length/g_wb_factor): + send_data.append(bg_vhdl_data[g_wb_factor*j+i]) # + bg.write_waveform_ram(data=send_data, channelNr= i) + bg.write_enable() + + # Poll the diag_data_buffer to check if the response is there. + # Retry after 3 seconds so we don't issue too many MM reads in case of simulation. + #do_until_ge(db_re.read_nof_words, ms_retry=3000, val=c_stimuli_length/g_wb_factor, s_timeout=3600) + + # Run simulator to into the second sync interval to ensure a fresh second capture in diag_data_buffer with stable WPFB output (because c_sync_interval_ns > c_wpfb_response_ns) + currentTime = io.simIO.getSimTime() + triggerTime = currentTime[0] + c_wpfb_response_ns + c_sync_interval_ns*2 + tc.append_log(3, ' Wait until simulation time has reached %d ns' % triggerTime) + do_until_gt(io.simIO.getSimTime, triggerTime, s_timeout=3600) + + ############################################################################### + # + # Read FFT output from data buffer + # + ############################################################################### + + # Read the spectrums from the databuffer + Xqqq_real = [] + Xqqq_imag = [] + for i in range(g_wb_factor): + Xqqq_real.append(db_re.read_data_buffer(streamNr=i, n=c_stimuli_length/g_wb_factor, radix='dec', width=g_out_dat_w)) + Xqqq_imag.append(db_im.read_data_buffer(streamNr=i, n=c_stimuli_length/g_wb_factor, radix='dec', width=g_out_dat_w)) + + Xqqq_real = flatten(Xqqq_real) + Xqqq_imag = flatten(Xqqq_imag) + + ############################################################################### + # + # Reformat VHDL FFT output spectrum + # + ############################################################################### + + # Create array with spectrums that can be indexed like : array[channel_nr][integration_nr][points_nr] of complex values + Xqqq_complex_array = [] + for h in range(c_nof_input_channels): + integrations_complex = [] + for i in range(c_nof_integrations): + spectrum_complex = [] + for j in range(g_wb_factor): + for k in range(g_nof_points/g_wb_factor): + spectrum_real = Xqqq_real[((h + i*c_nof_input_channels)*g_nof_points + j*c_stimuli_length)/g_wb_factor + k] + spectrum_imag = Xqqq_imag[((h + i*c_nof_input_channels)*g_nof_points + j*c_stimuli_length)/g_wb_factor + k] + spectrum_complex.append(complex(spectrum_real, spectrum_imag)) + integrations_complex.append(spectrum_complex) + Xqqq_complex_array.append(integrations_complex) + + # Get Xqqq_complex_array data in same list format as the time-series input data + Xqqq_complex_series = [] + for h in range(c_nof_input_channels): + integrations_complex = [] + for i in range(c_nof_integrations): + integrations_complex += Xqqq_complex_array[h][i][0:g_nof_points] + Xqqq_complex_series.append(integrations_complex) + + ############################################################################### + # + # Calculate the reference FFT output spectrums with Python + # + ############################################################################### + + # Use same format as for VHDL FFT output spectrum + # Create FFT outputs for xf and for xq + Xfff_complex_array = [] + Xqff_complex_array = [] + for h in range(c_nof_input_channels): + Xfff_complex = [] + Xqff_complex = [] + for i in range(c_nof_integrations): + f = xf_complex_series[h][i*g_nof_points:(i+1)*g_nof_points] + q = xq_complex_series[h][i*g_nof_points:(i+1)*g_nof_points] + spectrum_f = sp.fft(f, g_nof_points) / c_fft_scale + spectrum_q = sp.fft(q, g_nof_points) / c_fft_scale + Xfff_complex.append(spectrum_f.tolist()) + Xqff_complex.append(spectrum_q.tolist()) + Xfff_complex_array.append(Xfff_complex) + Xqff_complex_array.append(Xqff_complex) + + ############################################################################### + # + # Dynamic ranges + # + ############################################################################### + + tc.append_log(3, '') + tc.append_log(3, '>>> Dynamic ranges:') + tc.append_log(3, '') + tc.append_log(3, ' FFT input full scale = %d' % dsp_test.inFullScale) + tc.append_log(3, ' FFT output full scale = %d' % dsp_test.outFullScale) + tc.append_log(3, '') + for h in range(c_nof_input_channels): + ma_real = max_abs(xq_real_series[h][0:c_integration_length]) + ma_imag = max_abs(xq_imag_series[h][0:c_integration_length]) + tc.append_log(3, ' FFT input max_abs real channel %d = %d (= %4.1f%%)' % (h, ma_real, 100.0*ma_real/dsp_test.inFullScale)) + tc.append_log(3, ' FFT input max_abs imag channel %d = %d (= %4.1f%%)' % (h, ma_imag, 100.0*ma_imag/dsp_test.inFullScale)) + tc.append_log(3, '') + for h in range(c_nof_input_channels): + ma_real = max_abs(dsp_test.real_from_complex_list(Xqqq_complex_series[h][0:c_integration_length])) + ma_imag = max_abs(dsp_test.imag_from_complex_list(Xqqq_complex_series[h][0:c_integration_length])) + tc.append_log(3, ' FFT input max_abs real channel %d = %d (= %4.1f%%)' % (h, ma_real, 100.0*ma_real/dsp_test.outFullScale)) + tc.append_log(3, ' FFT input max_abs imag channel %d = %d (= %4.1f%%)' % (h, ma_imag, 100.0*ma_imag/dsp_test.outFullScale)) + + + ############################################################################### + # + # Theoretical SNR + # + ############################################################################### + + tc.append_log(3, '') + tc.append_log(3, '>>> Theoretical SNR:') + tc.append_log(3, '') + tc.append_log(3, ' SNR after ADC for full scale uniform noise = %7.2f dB' % dsp_test.snr_db_noise) + tc.append_log(3, ' SNR after ADC for full scale sine = %7.2f dB' % dsp_test.snr_db_sine) + tc.append_log(3, '') + tc.append_log(3, ' FFT gain = %7.2f dB' % dsp_test.fft_gain_db) + tc.append_log(3, '') + tc.append_log(3, ' SNR in FFT bin for full scale uniform noise = %7.2f dB' % dsp_test.fft_snr_db_noise) + tc.append_log(3, ' SNR in FFT bin for full scale sine = %7.2f dB' % dsp_test.fft_snr_db_sine) + + + ############################################################################### + # + # Calculate FFT input and output powers + # + ############################################################################### + + # Calculate FFT input sample powers + xf_power_array = [] + xq_power_array = [] + xe_power_array = [] + for h in range(c_nof_input_channels): + f = xf_complex_series[h][0:c_integration_length] + q = xq_complex_series[h][0:c_integration_length] + e = subtract_list(f, q) + xf_power_array.append(sum(dsp_test.calc_powers(f))) + xq_power_array.append(sum(dsp_test.calc_powers(q))) + xe_power_array.append(sum(dsp_test.calc_powers(e))) + + # Calculate FFT output sample powers + Xfff_power_array = [] + Xqff_power_array = [] + Xqqq_power_array = [] + Xeff_power_array = [] + Xeee_power_array = [] + for h in range(c_nof_input_channels): + Xfff_power = [] + Xqff_power = [] + Xqqq_power = [] + Xeff_power = [] + Xeee_power = [] + for j in range(c_nof_integrations): + fff = Xfff_complex_array[h][j] + qff = Xqff_complex_array[h][j] + qqq = Xqqq_complex_array[h][j] + eff = subtract_list(fff, qff) + eee = subtract_list(fff, qqq) + Xfff_power.append(dsp_test.calc_powers(fff)) + Xqff_power.append(dsp_test.calc_powers(qff)) + Xqqq_power.append(dsp_test.calc_powers(qqq)) + Xeff_power.append(dsp_test.calc_powers(eff)) + Xeee_power.append(dsp_test.calc_powers(eee)) + Xfff_power_array.append(Xfff_power) + Xqff_power_array.append(Xqff_power) + Xqqq_power_array.append(Xqqq_power) + Xeff_power_array.append(Xeff_power) + Xeee_power_array.append(Xeee_power) + + # Estimate average sample power for c_nof_integrations + Xfff_power_avg_array = [] + Xqff_power_avg_array = [] + Xqqq_power_avg_array = [] + Xeff_power_avg_array = [] + Xeee_power_avg_array = [] + for h in range(c_nof_input_channels): + Xfff_power_avg = [] + Xqff_power_avg = [] + Xqqq_power_avg = [] + Xeff_power_avg = [] + Xeee_power_avg = [] + for i in range(g_nof_points): + Xfff_sum = 0 + Xqff_sum = 0 + Xqqq_sum = 0 + Xeff_sum = 0 + Xeee_sum = 0 + for j in range(c_nof_integrations): + Xfff_sum += Xfff_power_array[h][j][i] + Xqff_sum += Xqff_power_array[h][j][i] + Xqqq_sum += Xqqq_power_array[h][j][i] + Xeff_sum += Xeff_power_array[h][j][i] + Xeee_sum += Xeee_power_array[h][j][i] + Xfff_power_avg.append(Xfff_sum/c_nof_integrations) + Xqff_power_avg.append(Xqff_sum/c_nof_integrations) + Xqqq_power_avg.append(Xqqq_sum/c_nof_integrations) + Xeff_power_avg.append(Xeff_sum/c_nof_integrations) + Xeee_power_avg.append(Xeee_sum/c_nof_integrations) + Xfff_power_avg_array.append(Xfff_power_avg) + Xqff_power_avg_array.append(Xqff_power_avg) + Xqqq_power_avg_array.append(Xqqq_power_avg) + Xeff_power_avg_array.append(Xeff_power_avg) + Xeee_power_avg_array.append(Xeee_power_avg) + + + ############################################################################### + # + # Method 1 : Simulated SNR = sine bin power / median noise bin power + # + ############################################################################### + + # Apply only for 'sine' at one input and 0 at the other + c_select_a = c_real_select=='sine' and c_imag_ampl==0; + c_select_b = c_imag_select=='sine' and c_real_ampl==0; + + if c_select_a or c_select_b: + # Estimate SNR of VHDL and Python output + tc.append_log(3, '') + tc.append_log(3, '>>> Simulated SNR = sine bin power / median noise bin power:') + tc.append_log(3, '') + snr_vhdl = [] + snr_py = [] + for h in range(c_nof_input_channels): + if c_select_a: + Rqqq_power_avg_array = Xqqq_power_avg_array[h][::2] + Rqff_power_avg_array = Xqff_power_avg_array[h][::2] + else: + Rqqq_power_avg_array = Xqqq_power_avg_array[h][1::2] + Rqff_power_avg_array = Xqff_power_avg_array[h][1::2] + snr_vhdl.append(dsp_test.calc_snr_sine_bin_noise_bin(Rqqq_power_avg_array)) + snr_py.append( dsp_test.calc_snr_sine_bin_noise_bin(Rqff_power_avg_array)) + tc.append_log(5, ' Rqqq_power_avg_array : %s' % Rqqq_power_avg_array) + tc.append_log(5, ' Rqff_power_avg_array : %s' % Rqff_power_avg_array) + tc.append_log(3, ' SNR in FFT bin with VHDL channel %d = %7.3f dB' % (h, dsp_test.to_dB(snr_vhdl[h]))) + tc.append_log(3, ' SNR in FFT bin with Python channel %d = %7.3f dB' % (h, dsp_test.to_dB(snr_py[h]))) + tc.append_log(3, '') + tc.append_log(3, ' FFT implementation loss channel %d = %7.3f dB' % (h, dsp_test.to_dB(snr_py[h]/snr_vhdl[h]))) + + if (dsp_test.to_dB(snr_py[h]) - dsp_test.to_dB(snr_vhdl[h]) > c_max_impl_loss_dB): + tc.append_log(2, ' FFT implementation loss is greater then maximum allowed loss of %d dB.' % c_max_impl_loss_dB ) + tc.set_result('FAILED') + + # Write the VHDL results to a file for comparison with other FFT-types. + if(c_write_to_file == 1): + f = file(c_file_name, "w") + for i in range(g_nof_points): + s = " %x %x \n" % (int(Xqqq_complex_array[0][0][i].real), int(Xqqq_complex_array[0][0][i].imag)) + f.write(s) + f.close() + tc.append_log(3, '') + tc.append_log(3, 'Spectra data is written to file ' + c_file_name) + + + ############################################################################### + # + # Method 2 : Simulated SNR = float signal power / calculated error power + # + ############################################################################### + + # Apply only for complex FFT output, so without seperate (because the separate function is not calculated in Python) + if g_use_separate==False: + # FFT input and output sample powers + tc.append_log(3, '') + tc.append_log(3, '>>> Simulated SNR:') + tc.append_log(3, '') + for h in range(c_nof_input_channels): + SNRe = xf_power_array[h] / xe_power_array[h] + SNReff = sum(Xfff_power_avg_array[h]) / sum(Xeff_power_avg_array[h]) # Note that SNReff = SNRe + SNReee = sum(Xfff_power_avg_array[h]) / sum(Xeee_power_avg_array[h]) + ILqee = SNReff / SNReee + tc.append_log(3, ' SNRe at input of FFT channel %d = %7.3f dB' % (h, dsp_test.to_dB(SNRe))) + tc.append_log(3, ' SNReff at output of FFT channel %d = %7.3f dB' % (h, dsp_test.to_dB(SNReff))) + tc.append_log(3, ' SNReee at output of FFT channel %d = %7.3f dB' % (h, dsp_test.to_dB(SNReee))) + tc.append_log(3, '') + tc.append_log(3, ' FFT implementation loss channel %d = %7.3f dB' % (h, dsp_test.to_dB(ILqee))) + + + ############################################################################### + # Plot + if c_plot_enable == 1: + + # Create variable for the axes. + x_as = [] + for i in range(c_integration_length): + x_as.append(i) + xi_as = [] + for i in range(0, c_integration_length, g_nof_points): + xi_as.append(i) + xi_dots = [0]*c_nof_integrations # place dots on the x-axis to mark the start of the FFT blocks and to force also have y = 0 in range of y-axis + f_as = [] + for i in range(g_nof_points): + f_as.append(i*c_sample_freq_mhz/g_nof_points) # 0 : N/2-1, N/2 : N-1 + + # Plot the time domain signals. + for h in range(c_nof_input_channels): + pl.figure(h+1) + pl.subplot(211) + pl.title('Input time-series data (%d points)' % g_nof_points) + pl.plot(x_as, xq_real_series[h][0:c_integration_length], 'b-', xi_as, xi_dots, 'ro') + pl.xlabel('FFT xq real input (A)') + pl.ylabel('Amplitude') + pl.grid() + pl.subplot(212) + pl.plot(x_as, xq_imag_series[h][0:c_integration_length], 'b-', xi_as, xi_dots, 'ro') + pl.xlabel('FFT xq imag input (B)') + pl.ylabel('Amplitude') + pl.grid() + + # Plot the frequency domain signals. + for h in range(c_nof_input_channels): + pl.figure(100+h) + pl.subplot(211) + pl.title('VHDL %s FFT output bins (%d points)' % (c_fft_type, g_nof_points)) + pl.plot(x_as, dsp_test.real_from_complex_list(Xqqq_complex_series[h][0:c_integration_length]), 'b-', xi_as, xi_dots, 'ro') + pl.xlabel('FFT Xqqq real output (A)') + pl.ylabel('Amplitude') + pl.grid() + pl.subplot(212) + pl.plot(x_as, dsp_test.imag_from_complex_list(Xqqq_complex_series[h][0:c_integration_length]), 'b-', xi_as, xi_dots, 'ro') + pl.xlabel('FFT Xqqq imag output (B)') + pl.ylabel('Amplitude') + pl.grid() + + ## Plot the VHDL spectrums + for h in range(c_nof_input_channels): + pl.figure(200+h) + pl.subplot(211) + pl.title('VHDL %s FFT averaged output spectrum (%d points)' % (c_fft_type, g_nof_points)) + pl.plot(f_as, dsp_test.to_dB(Xqqq_power_avg_array[h]), 'b-', [0],[0], 'ro') # at start of FFT bins at (0,0) to force 0 dB to be in range of y-axis + pl.xlabel('VHDL Xqqq Spectrum') + pl.ylabel('Power (dB)') + pl.grid() + pl.subplot(212) + pl.plot(f_as, dsp_test.to_dB(Xqff_power_avg_array[h]), 'b-', [0],[0], 'ro') # at start of FFT bins at (0,0) to force 0 dB to be in range of y-axis + pl.xlabel('Python Xqff Spectrum') + pl.ylabel('Power (dB)') + pl.grid() + + pl.show() + + ############################################################################### + # End + tc.set_section_id('') + tc.append_log(3, '') + tc.append_log(3, '>>>') + tc.append_log(0, '>>> Test bench result: %s' % tc.get_result()) + tc.append_log(3, '>>>') + + sys.exit(tc.get_result()) \ No newline at end of file diff --git a/libraries/dsp/wpfb/tb/python/tc_mmf_wpfb_unit_functional.py b/libraries/dsp/wpfb/tb/python/tc_mmf_wpfb_unit_functional.py new file mode 100644 index 0000000000000000000000000000000000000000..0c3f77693da7145b9994c82282b0a83c9e1cba1a --- /dev/null +++ b/libraries/dsp/wpfb/tb/python/tc_mmf_wpfb_unit_functional.py @@ -0,0 +1,320 @@ +#! /usr/bin/env python +############################################################################### +# +# Copyright (C) 2012 +# ASTRON (Netherlands Institute for Radio Astronomy) <http://www.astron.nl/> +# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# +############################################################################### + +""" Functional test case for the wpfb_unit entity. + This testcase should be used to test the different types of configuration + of the wpfb unit, meaning both wide- and narrowband configurations. + + Wideband configuration: g_wb_factor > 1 and g_nof_chan = 0 + Narrowband configuration: g_nof_chan > 0 and g_wb_factor = 1 + + Both aforementioned configurations can be parallelized using + the g_nof_wb_streams generic. + + The testcase applies the same input stimuli to all inputs and checks + if all generated spectrums are equal. + + Usage: + + > python tc_mmf_wpfb_unit_functional.py --unb 0 --bn 0 --sim + + +""" + +############################################################################### +# System imports +import test_case +import node_io +import pi_diag_block_gen +import pi_diag_data_buffer +import pi_fil_ppf_w +import dsp_test +import sys, os +import subprocess +import pylab as pl +import numpy as np +import scipy as sp +import random +from tools import * +from common import * + +# Create a test case object +tc = test_case.Testcase('TB - ', '') + +# Constants/Generics that are shared between VHDL and Python +# Name Value Default Description +# START_VHDL_GENERICS +g_wb_factor = 1 # The Wideband factor (must be power of 2) +g_nof_wb_streams = 8 # The number of parallel wideband streams +g_nof_chan = 1 # The exponent (for 2) that defines the number of time-multiplexed input channels +g_nof_points = 64 # The size of the FFT +g_nof_taps = 8 # The number of taps in the filter +g_in_dat_w = 6 # Input width of the FFT +g_out_dat_w = 16 # Output width of the FFT +g_use_separate = False # When False: FFT input is considered Complex. When True: FFT input is 2xReal +g_nof_blocks = 8 # The number of FFT blocks that are simulated per BG period # (must be >= 1 and power of 2 due to that BG c_bg_ram_size must be > 1 and power of 2 to avoid unused addresses) +# END_VHDL_GENERICS + +# Overwrite generics with argumented generics from autoscript or command line. +if tc.generics != None: + g_wb_factor = tc.generics['g_wb_factor'] + g_nof_wb_streams = tc.generics['g_nof_wb_streams'] + g_nof_chan = tc.generics['g_nof_chan'] + g_nof_points = tc.generics['g_nof_points'] + g_nof_taps = tc.generics['g_nof_taps'] + g_in_dat_w = tc.generics['g_in_dat_w'] + g_out_dat_w = tc.generics['g_out_dat_w'] + g_use_separate = tc.generics['g_use_separate'] + g_nof_blocks = tc.generics['g_nof_blocks'] + +tc.append_log(3, '>>>') +tc.append_log(1, '>>> Title : Test bench for wpfb_unit entity with %d points and %d taps' % (g_nof_points, g_nof_taps)) +tc.append_log(1, '>>> This is a functional test bench that can be used to test the' ) +tc.append_log(1, '>>> differesnt types of configuaration. ' ) +tc.append_log(3, '>>>') +tc.append_log(3, '') +tc.set_result('PASSED') + +if g_nof_chan != 0 and g_wb_factor != 1 : + g_nof_chan = 0 + tc.append_log(2, 'WARNING: Forced g_nof_chan=0 because g_wb_factor is not 1. When wb_factor > 1 it is not ') + tc.append_log(2, ' possible to use time-multiplexed input channels.') + +tc.append_log(3, '') +tc.append_log(3, '>>> VHDL generic settings:') +tc.append_log(3, '') +tc.append_log(3, ' g_wb_factor = %d' % g_wb_factor ) +tc.append_log(3, ' g_nof_wb_streams = %d' % g_nof_wb_streams) +tc.append_log(3, ' g_nof_chan = %d' % g_nof_chan ) +tc.append_log(3, ' g_nof_points = %d' % g_nof_points ) +tc.append_log(3, ' g_nof_taps = %d' % g_nof_taps ) +tc.append_log(3, ' g_in_dat_w = %d' % g_in_dat_w ) +tc.append_log(3, ' g_out_dat_w = %d' % g_out_dat_w ) +tc.append_log(3, ' g_use_separate = %s' % g_use_separate ) +tc.append_log(3, ' g_nof_blocks = %d' % g_nof_blocks ) +tc.append_log(3, '') + +# Python waveform constants +c_real_select = 'sine' # select waveform: 'sine', 'square', 'impulse', 'noise', 'gaussian', 'zero' (default) +c_real_ampl = 1.0 # normalized full scale amplitude for sine, square, impulse and noise input +c_real_phase = 0.0 # sine phase in degrees +c_real_freq = 2 # nof periods per c_nof_points +c_real_index = 1 # time index within c_nof_points +c_real_noiselevel = 1.0 # added ADC noise level in units of 1 bit +c_real_seed = 1 # random seed for noise signal + +c_imag_select = 'gaussian' # select waveform: 'sine', 'square', 'impulse', 'noise', 'gaussian', 'zero' (default) +c_imag_ampl = 1.0 +c_imag_ampl = 0.0 # ... comment this line to enable gaussian +c_imag_phase = 45.0 +c_imag_freq = 5 +c_imag_index = 1 +c_imag_noiselevel = 0.0 +c_imag_seed = 2 + +tc.append_log(3, '') +tc.append_log(3, '>>> Waveform settings:') +tc.append_log(3, '') +tc.append_log(3, ' c_real_select = %s' % c_real_select) +tc.append_log(3, ' c_real_ampl = %7.3f' % c_real_ampl) +tc.append_log(3, ' c_real_phase = %7.3f' % c_real_phase) +tc.append_log(3, ' c_real_freq = %7.3f' % c_real_freq) +tc.append_log(3, ' c_real_index = %d' % c_real_index) +tc.append_log(3, ' c_real_noiselevel = %7.3f' % c_real_noiselevel) +tc.append_log(3, ' c_real_seed = %7.3f' % c_real_seed) +tc.append_log(3, '') +tc.append_log(3, ' c_imag_select = %s' % c_imag_select) +tc.append_log(3, ' c_imag_ampl = %7.3f' % c_imag_ampl) +tc.append_log(3, ' c_imag_phase = %7.3f' % c_imag_phase) +tc.append_log(3, ' c_imag_freq = %7.3f' % c_imag_freq) +tc.append_log(3, ' c_imag_index = %d' % c_imag_index) +tc.append_log(3, ' c_imag_noiselevel = %7.3f' % c_imag_noiselevel) +tc.append_log(3, ' c_imag_seed = %7.3f' % c_imag_seed) +tc.append_log(3, '') + + +# Python specific constants +c_nof_input_channels = 2**g_nof_chan +c_nof_streams = g_nof_wb_streams*c_nof_input_channels +c_nof_bg_streams = g_nof_wb_streams*g_wb_factor +c_frame_size = c_nof_input_channels*g_nof_points/g_wb_factor +c_nof_integrations = g_nof_blocks # The number of spectrums that are averaged, must be <= c_nof_blocks +c_channel_length = g_nof_blocks*g_nof_points # number of time series samples per input channel +c_bg_ram_size = g_nof_blocks*c_frame_size +c_stimuli_length = c_nof_input_channels*c_channel_length # total number of time series samples for the BG that generates all channels +c_integration_length = c_nof_integrations*g_nof_points # number of time series samples per input channel that are used for integration +c_blocks_per_sync = g_nof_taps+4 # choose sync interval little bit longer than the WPFB impulse response + +c_dp_clk_period_ns = 5 +c_subband_period_ns = c_frame_size * c_dp_clk_period_ns # Subband period in ns = BSN block time in ns +c_sync_interval_ns = c_subband_period_ns*c_blocks_per_sync # sync interval in ns + + +# Create access object for nodes +io = node_io.NodeIO(tc.nodeImages, tc.base_ip) + +# Create block generator instance +bg = pi_diag_block_gen.PiDiagBlockGen(tc, io, c_nof_bg_streams, ramSizePerChannel=c_bg_ram_size) + +# Create databuffer instances +db_re = pi_diag_data_buffer.PiDiagDataBuffer(tc, io, instanceName = 'REAL', nofStreams=c_nof_bg_streams, ramSizePerStream=c_stimuli_length/g_wb_factor) +db_im = pi_diag_data_buffer.PiDiagDataBuffer(tc, io, instanceName = 'IMAG', nofStreams=c_nof_bg_streams, ramSizePerStream=c_stimuli_length/g_wb_factor) + +# Create dsp_test instance for helpful methods +dsp_test = dsp_test.DspTest(g_nof_points, g_in_dat_w, g_out_dat_w) + +def gen_filter_hex_files(c_nof_taps = 4, c_wb_factor = 4, c_nof_points = 16): + # Create mif files for coefficient memories. + create_mifs_cmd = "python ../../../filter/src/python/create_mifs.py -t %d -w %d -b %d " % (c_nof_taps, c_wb_factor, c_nof_points) + cm = subprocess.Popen(create_mifs_cmd, cwd=r'../../../filter/src/python/', shell=True, close_fds=True) + return + +if __name__ == "__main__": + + ############################################################################### + # Generate the hex files for simulation. If hex files do not yet + # exist it is required to start this script twice. + ############################################################################### + gen_filter_hex_files(g_nof_taps, g_wb_factor, g_nof_points) + + ############################################################################### + # + # Write the Wideband Poly Phase Filter Bank input stimuli to the BG + # + ############################################################################### + + # Prepare stimuli order for block generator + xf_real = [] + xf_imag = [] + for i in range(c_nof_streams): + xf_real.append(dsp_test.create_waveform(sel=c_real_select, ampl=c_real_ampl, phase=c_real_phase, freq=c_real_freq, timeIndex=c_real_index, seed=c_real_seed, noiseLevel=c_real_noiselevel, length=c_channel_length)) + xf_imag.append(dsp_test.create_waveform(sel=c_imag_select, ampl=c_imag_ampl, phase=c_imag_phase, freq=c_imag_freq, timeIndex=c_imag_index, seed=c_imag_seed, noiseLevel=c_imag_noiselevel, length=c_channel_length)) + + xq_real=[] + xq_imag=[] + for i in range(c_nof_streams): + xq_real.append(dsp_test.adc_quantize_waveform(xf_real[i])) + xq_imag.append(dsp_test.adc_quantize_waveform(xf_imag[i])) + + streams = [] + for i in range(c_nof_streams): + streams.append(dsp_test.concatenate_two_lists(xq_real[i], xq_imag[i], g_in_dat_w)) + + # Apply the alternated order due to the channels and create the wb_streams + wb_stream = [] + for h in range(g_nof_wb_streams): + chnl_stream = [] + for i in range(c_channel_length): + for j in range(c_nof_input_channels): + chnl_stream.append(streams[h*c_nof_input_channels + j][i]) + wb_stream.append(chnl_stream) + + # Use the wb_streams to create the actual streams for the block generator + # Write the stimuli to the block generator + for h in range(g_nof_wb_streams): + for i in range(g_wb_factor): + send_data = [] + for j in range(c_channel_length/g_wb_factor): + for k in range(c_nof_input_channels): + send_data.append(wb_stream[h][g_wb_factor*c_nof_input_channels*j+i*c_nof_input_channels+k]) # + bg.write_waveform_ram(data=send_data, channelNr= h*g_wb_factor + i) + + # Write setting for the block generator: + bg.write_block_gen_settings(samplesPerPacket=c_frame_size, blocksPerSync=c_blocks_per_sync, gapSize=0, memLowAddr=0, memHighAddr=c_bg_ram_size-1, BSNInit=0) + + # enable the block generator + bg.write_enable() + + # Poll the diag_data_buffer to check if the response is there. + # Retry after 3 seconds so we don't issue too many MM reads in case of simulation. + # do_until_ge(db_re.read_nof_words, ms_retry=3000, val=c_stimuli_length/g_wb_factor, s_timeout=3600) + + # Run simulator to into the second sync interval to ensure a fresh second capture in diag_data_buffer with stable WPFB output (because c_sync_interval_ns > c_wpfb_response_ns) + cur_time = io.simIO.getSimTime() + print "cur_time=" + str(cur_time) + print "c_sync_interval_ns=" + str(c_sync_interval_ns) + + do_until_gt(io.simIO.getSimTime, cur_time[0] + c_sync_interval_ns*5, s_timeout=3600) + + + ############################################################################### + # + # Read FFT output from data buffer + # + ############################################################################### + + # Read the spectrums from the databuffer + X_real = [] + X_imag = [] + for i in range(c_nof_bg_streams): + X_real.append(db_re.read_data_buffer(streamNr=i, n=c_stimuli_length/g_wb_factor, radix='dec', width=g_out_dat_w)) + X_imag.append(db_im.read_data_buffer(streamNr=i, n=c_stimuli_length/g_wb_factor, radix='dec', width=g_out_dat_w)) + + # re-arrange output data + spectrums_real = [] + spectrums_imag = [] + if(g_wb_factor > 1): # For wideband + for h in range(g_nof_wb_streams): + spectrum_real = [] + spectrum_imag = [] + for i in range(g_wb_factor): + for k in range(c_channel_length/g_wb_factor): + spectrum_real.append(X_real[h*g_wb_factor+i][0][k]) + spectrum_imag.append(X_imag[h*g_wb_factor+i][0][k]) + spectrums_real.append(spectrum_real) + spectrums_imag.append(spectrum_imag) + else: # For narrowband + for h in range(g_nof_wb_streams): + for j in range(c_nof_input_channels): + spectrum_real = [] + spectrum_imag = [] + for k in range(g_nof_blocks): + for l in range(g_nof_points): + spectrum_real.append(X_real[h][0][k*g_nof_points*c_nof_input_channels+j*g_nof_points+l]) + spectrum_imag.append(X_imag[h][0][k*g_nof_points*c_nof_input_channels+j*g_nof_points+l]) + spectrums_real.append(spectrum_real) + spectrums_imag.append(spectrum_imag) + + # Check if all output spectrums are equal. + for i in range(g_nof_wb_streams*c_nof_input_channels): + for j in range(c_channel_length): + if spectrums_real[0][j] != spectrums_real[i][j]: + tc.append_log(2, '>>> Real part of spectrums are unequal') + tc.set_result('FAILED') + if spectrums_imag[0][j] != spectrums_imag[i][j]: + tc.append_log(2, '>>> Imag part of spectrums are unequal') + tc.set_result('FAILED') + + tc.append_log(5, 'spectrums_real') + for i in range(len(spectrums_real[0])): + tc.append_log(5, 'spectrums_real = %d ' % spectrums_real[0][i] ) + tc.append_log(5, 'spectrums_imag') + for i in range(len(spectrums_imag[0])): + tc.append_log(5, 'spectrums_imag = %d ' % spectrums_imag[0][i] ) + + tc.append_log(3, '') + tc.append_log(3, '>>>') + tc.append_log(0, '>>> Test bench result: %s' % tc.get_result()) + tc.append_log(3, '>>>') + + sys.exit(tc.get_result()) +