Skip to content
Snippets Groups Projects
Commit caad923e authored by Eric Kooistra's avatar Eric Kooistra
Browse files

Ported tc_mmf_wpfb_unit.py and tc_mmf_wpfb_unit_functional.py to RadioHDL...

Ported tc_mmf_wpfb_unit.py and tc_mmf_wpfb_unit_functional.py to RadioHDL because of used data file paths.
parent 5e306f4b
No related branches found
No related tags found
No related merge requests found
#! /usr/bin/env python
###############################################################################
#
# Copyright (C) 2012
# ASTRON (Netherlands Institute for Radio Astronomy) <http://www.astron.nl/>
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
###############################################################################
"""Test case for the wpfb_unit entity.
This test case verifies the quality of the wideband poly phase filterbank.
The testcase can be runned automatically via the wpfb_unit.py script as part
of a regression test:
> wpfb_unit.py --hold
It is also possible to run this manually within modelsim. Take care that the
generics of python and vhdl correspond to eachother.
Within modelsim:
> lp wpfb (this will load th wpfb library)
- Double click the tb_mmf_wpfb_unit simulation configuration.
> as 10
> run -all
Then in a separate console run this script as:
> python tc_mmf_wpfb_unit.py --unb 0 --bn 0
"""
###############################################################################
# System imports
import test_case
import node_io
import unb_apertif as apr
import pi_diag_block_gen
import pi_diag_data_buffer
import pi_fil_ppf_w
import dsp_test
import sys, os
import subprocess
import pylab as pl
import numpy as np
import scipy as sp
import random
from tools import *
from common import *
# Create a test case object
tc = test_case.Testcase('TB - ', '')
# Constants/Generics that are shared between VHDL and Python
# Name Value Default Description
# START_VHDL_GENERICS
g_wb_factor = 1 # The Wideband factor (must be power of 2)
g_nof_chan = 0 # The exponent (for 2) that defines the number of time-multiplexed input channels
g_nof_wb_streams = 1 # The number of parallel streams
g_nof_points = 64 # The size of the FFT
g_nof_taps = 16 # The number of taps in the filter
g_in_dat_w = 8 # Input width of the FFT
g_out_dat_w = 16 # Output width of the FFT
g_use_separate = False # When False: FFT input is considered Complex. When True: FFT input is 2xReal
g_nof_blocks = 4 # The number of FFT blocks that are simulated per BG period # (must be >= 1 and power of 2 due to that BG c_bg_ram_size must be > 1 and power of 2 to avoid unused addresses)
# END_VHDL_GENERICS
# Overwrite generics with argumented generics from autoscript or command line.
if tc.generics != None:
g_wb_factor = tc.generics['g_wb_factor']
g_nof_wb_streams = tc.generics['g_nof_wb_streams']
g_nof_chan = tc.generics['g_nof_chan']
g_nof_points = tc.generics['g_nof_points']
g_nof_taps = tc.generics['g_nof_taps']
g_in_dat_w = tc.generics['g_in_dat_w']
g_out_dat_w = tc.generics['g_out_dat_w']
g_use_separate = tc.generics['g_use_separate']
g_nof_blocks = tc.generics['g_nof_blocks']
tc.append_log(3, '>>>')
tc.append_log(1, '>>> Title : Test bench for wpfb_unit entity with %d points and %d taps' % (g_nof_points, g_nof_taps))
tc.append_log(3, '>>>')
tc.append_log(3, '')
tc.set_result('PASSED')
# Check whether the generics combination can be handled
if g_nof_chan != 0:
g_nof_chan = 0
tc.append_log(2, 'WARNING: Forced g_nof_chan=0 because multiple channels are not yet supported in this testcase ')
c_fft_type = "wide"
tc.append_log(3, '')
tc.append_log(3, '>>> VHDL generic settings:')
tc.append_log(3, '')
tc.append_log(3, ' g_nof_chan = %d' % g_nof_chan )
tc.append_log(3, ' g_nof_wb_streams = %d' % g_nof_wb_streams )
tc.append_log(3, ' g_wb_factor = %d' % g_wb_factor )
tc.append_log(3, ' g_nof_points = %d' % g_nof_points )
tc.append_log(3, ' g_nof_taps = %d' % g_nof_taps )
tc.append_log(3, ' g_in_dat_w = %d' % g_in_dat_w )
tc.append_log(3, ' g_out_dat_w = %d' % g_out_dat_w )
tc.append_log(3, ' g_use_separate = %s' % g_use_separate)
tc.append_log(3, ' g_nof_blocks = %d' % g_nof_blocks )
tc.append_log(3, '')
c_full_scale = 2**(g_in_dat_w-1)-1
c_real_sigma = 1.0 # standard deviation (sigma) defined as nof in_dat quantization steps
c_imag_sigma = 2.0 # standard deviation (sigma) defined as nof in_dat quantization steps
# FFT scaling
c_fft_float_gain = g_nof_points
c_fft_vhdl_gain = 2**(g_out_dat_w - g_in_dat_w)
c_fft_scale = 1.0 * c_fft_float_gain / c_fft_vhdl_gain
# Python waveform constants
c_real_select = 'sine' # select waveform: 'sine', 'square', 'impulse', 'noise', 'gaussian', 'zero' (default)
c_real_ampl = 0.8 # normalized full scale amplitude for sine, square, impulse and noise input
c_real_phase = 0.0 # sine phase in degrees
c_real_freq = 2 # nof periods per g_nof_points
c_real_index = 1 # time index within g_nof_points
c_real_noiselevel = 1.0 # added ADC noise level in units of 1 bit
c_real_seed = 1 # random seed for noise signal
c_imag_select = 'gaussian' # select waveform: 'sine', 'square', 'impulse', 'noise', 'gaussian', 'zero' (default)
c_imag_ampl = 1.0*c_imag_sigma/c_full_scale
c_imag_ampl = 0.0 # ... comment this line to enable gaussian
c_imag_phase = 45.0
c_imag_freq = 5
c_imag_index = 1
c_imag_noiselevel = 0.0
c_imag_seed = 2
tc.append_log(3, '')
tc.append_log(3, '>>> Waveform settings:')
tc.append_log(3, '')
tc.append_log(3, ' c_real_select = %s' % c_real_select)
tc.append_log(3, ' c_real_sigma = %7.3f' % c_real_sigma)
tc.append_log(3, ' c_real_ampl = %7.3f' % c_real_ampl)
tc.append_log(3, ' c_real_phase = %7.3f' % c_real_phase)
tc.append_log(3, ' c_real_freq = %7.3f' % c_real_freq)
tc.append_log(3, ' c_real_index = %d' % c_real_index)
tc.append_log(3, ' c_real_noiselevel = %7.3f' % c_real_noiselevel)
tc.append_log(3, ' c_real_seed = %7.3f' % c_real_seed)
tc.append_log(3, '')
tc.append_log(3, ' c_imag_select = %s' % c_imag_select)
tc.append_log(3, ' c_imag_sigma = %7.3f' % c_imag_sigma)
tc.append_log(3, ' c_imag_ampl = %7.3f' % c_imag_ampl)
tc.append_log(3, ' c_imag_phase = %7.3f' % c_imag_phase)
tc.append_log(3, ' c_imag_freq = %7.3f' % c_imag_freq)
tc.append_log(3, ' c_imag_index = %d' % c_imag_index)
tc.append_log(3, ' c_imag_noiselevel = %7.3f' % c_imag_noiselevel)
tc.append_log(3, ' c_imag_seed = %7.3f' % c_imag_seed)
tc.append_log(3, '')
# Python specific constants
c_sample_freq_mhz = 800
c_dsp_clk_period_ns = 5
g_nof_wb_streams = g_wb_factor
c_nof_input_channels = 2**g_nof_chan
c_frame_size = g_nof_points/g_wb_factor
c_nof_integrations = g_nof_blocks # The number of spectrums that are averaged, must be <= g_nof_blocks
c_channel_length = g_nof_blocks*g_nof_points # number of time series samples per input channel
c_bg_ram_size = c_nof_input_channels*g_nof_blocks*c_frame_size
c_stimuli_length = c_nof_input_channels*c_channel_length # total number of time series samples for the BG that generates all channels
c_integration_length = c_nof_integrations*g_nof_points # number of time series samples per input channel that are used for integration
c_blocks_per_sync = g_nof_taps+4 # choose sync interval little bit longer than the WPFB impulse response
c_nof_input_signals = 1 # Represents the real and imaginary input or input A and B
c_coefs_input_file ="../../../../modules/Lofar/pfs/src/data/Coeffs16384Kaiser-quant.dat"
c_nof_coefs_in_file = 16384
c_downsample_factor = c_nof_coefs_in_file/(g_nof_taps*g_nof_points) # use Coeffs16384Kaiser-quant.dat as lookup table in case c_downsample_factor != 1
c_write_coefs = 1
c_write_ones = 0
c_write_to_file = 0
c_plot_enable = 1
c_max_impl_loss_dB = 1 # Maximum allowed implementation loss in dB's
#c_subband_period_ns = g_nof_points*1e3/c_sample_freq_mhz # Subband period in ns = BSN block time in ns
c_subband_period_ns = g_nof_points/g_wb_factor*c_dsp_clk_period_ns # Subband period in ns = BSN block time in ns
c_wpfb_response_ns = c_subband_period_ns*g_nof_taps # WPFB response time in ns
c_sync_interval_ns = c_subband_period_ns*c_blocks_per_sync # sync interval in ns
# Create access object for nodes
io = node_io.NodeIO(tc.nodeImages, tc.base_ip)
# Create block generator instance
bg = pi_diag_block_gen.PiDiagBlockGen(tc, io, g_nof_wb_streams, ramSizePerChannel=c_bg_ram_size)
# Create databuffer instances
db_re = pi_diag_data_buffer.PiDiagDataBuffer(tc, io, instanceName = 'REAL', nofStreams=g_nof_wb_streams, ramSizePerStream=c_stimuli_length/g_wb_factor)
db_im = pi_diag_data_buffer.PiDiagDataBuffer(tc, io, instanceName = 'IMAG', nofStreams=g_nof_wb_streams, ramSizePerStream=c_stimuli_length/g_wb_factor)
# Create filter instance
fil = pi_fil_ppf_w.PiFilPpfw(tc, io, c_nof_input_signals, g_nof_taps, g_nof_points, g_wb_factor)
# Create dsp_test instance for helpful methods
dsp_test = dsp_test.DspTest(g_nof_points, g_in_dat_w, g_out_dat_w)
def gen_filter_hex_files(c_nof_taps = 4, c_wb_factor = 4, c_nof_points = 16):
# Create mif files for coefficient memories.
create_mifs_cmd = "python ../../../filter/src/python/create_mifs.py -t %d -w %d -b %d -o" % (c_nof_taps, c_wb_factor, c_nof_points)
cm = subprocess.Popen(create_mifs_cmd, cwd=r'../../../filter/src/python/', shell=True, close_fds=True)
return
if __name__ == "__main__":
###############################################################################
# Generate the hex files for simulation. If hex files do not yet
# exist it is required to start this script twice.
###############################################################################
gen_filter_hex_files(g_nof_taps, g_wb_factor, g_nof_points)
###############################################################################
#
# Write filtercoefficients to memory of the Wideband Poly Phase Filter Bank:
#
###############################################################################
if c_write_coefs == 1:
# Read all the coefficients from the file into a list.
coefs_list_from_file =[]
f = file(c_coefs_input_file, "r")
for i in range(c_nof_coefs_in_file):
s = int(f.readline())
s = s & (2**apr.c_coefs_width-1)
coefs_list_from_file.append(s)
f.close()
# Downsample the list to the number of required coefficients, based on the g_nof_taps and the c_fft_size
coefs_list = []
for i in range(g_nof_taps*g_nof_points):
s = coefs_list_from_file[i*c_downsample_factor]
coefs_list.append(s)
# Write the coefficients to the memories
all_taps = []
if(c_write_ones == 0):
for l in range(c_nof_input_signals):
for k in range(g_wb_factor):
for j in range(g_nof_taps):
write_list=[]
for i in range(g_nof_points/g_wb_factor):
write_list.append(coefs_list[j*g_nof_points+i*g_wb_factor + g_wb_factor-1-k])
write_list_rev = []
for i in range(g_nof_points/g_wb_factor): # Reverse the list
write_list_rev.append(write_list[g_nof_points/g_wb_factor-i-1])
fil.write_coefs(write_list_rev,l,j,k)
all_taps.append(write_list_rev)
coef_totals = []
# for i in all_taps:
# print i
# for i in range(g_nof_points):
# coef_total = 0
# for j in range(g_nof_taps):
# coef_total = coef_total + all_taps[j][i]
# coef_totals.append(coef_total)
# print coef_total
else:
all_ones = []
all_zeros = []
for i in range(g_nof_points/g_wb_factor):
all_ones.append(0x4000)
all_zeros.append(0x0)
for l in range(c_nof_input_signals):
for k in range(g_wb_factor):
for j in range(g_nof_taps):
if(j==0):
fil.write_coefs(all_ones,l,j,k)
else:
fil.write_coefs(all_zeros,l,j,k)
###############################################################################
#
# Prepare input stimuli for Wideband Poly Phase Filter Bank
#
###############################################################################
# Prepare data time-series stimuli that can be indexed like : series[channel_nr][time_nr] of real and imag values or complex values, where time_nr is 0:c_channel_length-1
# Create a floating point real and imag input signal xf.
xf_real_series=[]
xf_imag_series=[]
for i in range(c_nof_input_channels):
xf_real_series.append(dsp_test.create_waveform(sel=c_real_select, ampl=c_real_ampl, phase=c_real_phase, freq=c_real_freq+i, timeIndex=c_real_index+i, seed=c_real_seed+i, noiseLevel=c_real_noiselevel, length=c_channel_length))
xf_imag_series.append(dsp_test.create_waveform(sel=c_imag_select, ampl=c_imag_ampl, phase=c_imag_phase, freq=c_imag_freq+i, timeIndex=c_imag_index+i, seed=c_imag_seed+i, noiseLevel=c_imag_noiselevel, length=c_channel_length))
# Quantize xf --> ADC --> xq]
xq_real_series=[]
xq_imag_series=[]
for i in range(c_nof_input_channels):
xq_real_series.append(dsp_test.adc_quantize_waveform(xf_real_series[i]))
xq_imag_series.append(dsp_test.adc_quantize_waveform(xf_imag_series[i]))
# Convert to complex array
xf_complex_series=[]
xq_complex_series=[]
for i in range(c_nof_input_channels):
xf_complex_series.append(dsp_test.to_complex_list(xf_real_series[i], xf_imag_series[i]))
xq_complex_series.append(dsp_test.to_complex_list(xq_real_series[i], xq_imag_series[i]))
###############################################################################
#
# Write the Wideband Poly Phase Filter Bank input stimuli to the BG
#
###############################################################################
# Prepare xq stimuli order for block generator : list[time_nr * channel_nr]
xq_real = []
xq_imag = []
for i in range(c_channel_length):
for j in range(c_nof_input_channels):
xq_real.append(xq_real_series[j][i])
xq_imag.append(xq_imag_series[j][i])
bg_vhdl_data = dsp_test.concatenate_two_lists(xq_real, xq_imag, g_in_dat_w)
# Write setting for the block generator:
bg.write_block_gen_settings(samplesPerPacket=c_frame_size, blocksPerSync=c_blocks_per_sync, gapSize=0, memLowAddr=0, memHighAddr=c_bg_ram_size-1, BSNInit=0)
# Write the stimuli to the block generator and enable the block generator
for i in range(g_wb_factor):
send_data = []
for j in range(c_stimuli_length/g_wb_factor):
send_data.append(bg_vhdl_data[g_wb_factor*j+i]) #
bg.write_waveform_ram(data=send_data, channelNr= i)
bg.write_enable()
# Poll the diag_data_buffer to check if the response is there.
# Retry after 3 seconds so we don't issue too many MM reads in case of simulation.
#do_until_ge(db_re.read_nof_words, ms_retry=3000, val=c_stimuli_length/g_wb_factor, s_timeout=3600)
# Run simulator to into the second sync interval to ensure a fresh second capture in diag_data_buffer with stable WPFB output (because c_sync_interval_ns > c_wpfb_response_ns)
currentTime = io.simIO.getSimTime()
triggerTime = currentTime[0] + c_wpfb_response_ns + c_sync_interval_ns*2
tc.append_log(3, ' Wait until simulation time has reached %d ns' % triggerTime)
do_until_gt(io.simIO.getSimTime, triggerTime, s_timeout=3600)
###############################################################################
#
# Read FFT output from data buffer
#
###############################################################################
# Read the spectrums from the databuffer
Xqqq_real = []
Xqqq_imag = []
for i in range(g_wb_factor):
Xqqq_real.append(db_re.read_data_buffer(streamNr=i, n=c_stimuli_length/g_wb_factor, radix='dec', width=g_out_dat_w))
Xqqq_imag.append(db_im.read_data_buffer(streamNr=i, n=c_stimuli_length/g_wb_factor, radix='dec', width=g_out_dat_w))
Xqqq_real = flatten(Xqqq_real)
Xqqq_imag = flatten(Xqqq_imag)
###############################################################################
#
# Reformat VHDL FFT output spectrum
#
###############################################################################
# Create array with spectrums that can be indexed like : array[channel_nr][integration_nr][points_nr] of complex values
Xqqq_complex_array = []
for h in range(c_nof_input_channels):
integrations_complex = []
for i in range(c_nof_integrations):
spectrum_complex = []
for j in range(g_wb_factor):
for k in range(g_nof_points/g_wb_factor):
spectrum_real = Xqqq_real[((h + i*c_nof_input_channels)*g_nof_points + j*c_stimuli_length)/g_wb_factor + k]
spectrum_imag = Xqqq_imag[((h + i*c_nof_input_channels)*g_nof_points + j*c_stimuli_length)/g_wb_factor + k]
spectrum_complex.append(complex(spectrum_real, spectrum_imag))
integrations_complex.append(spectrum_complex)
Xqqq_complex_array.append(integrations_complex)
# Get Xqqq_complex_array data in same list format as the time-series input data
Xqqq_complex_series = []
for h in range(c_nof_input_channels):
integrations_complex = []
for i in range(c_nof_integrations):
integrations_complex += Xqqq_complex_array[h][i][0:g_nof_points]
Xqqq_complex_series.append(integrations_complex)
###############################################################################
#
# Calculate the reference FFT output spectrums with Python
#
###############################################################################
# Use same format as for VHDL FFT output spectrum
# Create FFT outputs for xf and for xq
Xfff_complex_array = []
Xqff_complex_array = []
for h in range(c_nof_input_channels):
Xfff_complex = []
Xqff_complex = []
for i in range(c_nof_integrations):
f = xf_complex_series[h][i*g_nof_points:(i+1)*g_nof_points]
q = xq_complex_series[h][i*g_nof_points:(i+1)*g_nof_points]
spectrum_f = sp.fft(f, g_nof_points) / c_fft_scale
spectrum_q = sp.fft(q, g_nof_points) / c_fft_scale
Xfff_complex.append(spectrum_f.tolist())
Xqff_complex.append(spectrum_q.tolist())
Xfff_complex_array.append(Xfff_complex)
Xqff_complex_array.append(Xqff_complex)
###############################################################################
#
# Dynamic ranges
#
###############################################################################
tc.append_log(3, '')
tc.append_log(3, '>>> Dynamic ranges:')
tc.append_log(3, '')
tc.append_log(3, ' FFT input full scale = %d' % dsp_test.inFullScale)
tc.append_log(3, ' FFT output full scale = %d' % dsp_test.outFullScale)
tc.append_log(3, '')
for h in range(c_nof_input_channels):
ma_real = max_abs(xq_real_series[h][0:c_integration_length])
ma_imag = max_abs(xq_imag_series[h][0:c_integration_length])
tc.append_log(3, ' FFT input max_abs real channel %d = %d (= %4.1f%%)' % (h, ma_real, 100.0*ma_real/dsp_test.inFullScale))
tc.append_log(3, ' FFT input max_abs imag channel %d = %d (= %4.1f%%)' % (h, ma_imag, 100.0*ma_imag/dsp_test.inFullScale))
tc.append_log(3, '')
for h in range(c_nof_input_channels):
ma_real = max_abs(dsp_test.real_from_complex_list(Xqqq_complex_series[h][0:c_integration_length]))
ma_imag = max_abs(dsp_test.imag_from_complex_list(Xqqq_complex_series[h][0:c_integration_length]))
tc.append_log(3, ' FFT input max_abs real channel %d = %d (= %4.1f%%)' % (h, ma_real, 100.0*ma_real/dsp_test.outFullScale))
tc.append_log(3, ' FFT input max_abs imag channel %d = %d (= %4.1f%%)' % (h, ma_imag, 100.0*ma_imag/dsp_test.outFullScale))
###############################################################################
#
# Theoretical SNR
#
###############################################################################
tc.append_log(3, '')
tc.append_log(3, '>>> Theoretical SNR:')
tc.append_log(3, '')
tc.append_log(3, ' SNR after ADC for full scale uniform noise = %7.2f dB' % dsp_test.snr_db_noise)
tc.append_log(3, ' SNR after ADC for full scale sine = %7.2f dB' % dsp_test.snr_db_sine)
tc.append_log(3, '')
tc.append_log(3, ' FFT gain = %7.2f dB' % dsp_test.fft_gain_db)
tc.append_log(3, '')
tc.append_log(3, ' SNR in FFT bin for full scale uniform noise = %7.2f dB' % dsp_test.fft_snr_db_noise)
tc.append_log(3, ' SNR in FFT bin for full scale sine = %7.2f dB' % dsp_test.fft_snr_db_sine)
###############################################################################
#
# Calculate FFT input and output powers
#
###############################################################################
# Calculate FFT input sample powers
xf_power_array = []
xq_power_array = []
xe_power_array = []
for h in range(c_nof_input_channels):
f = xf_complex_series[h][0:c_integration_length]
q = xq_complex_series[h][0:c_integration_length]
e = subtract_list(f, q)
xf_power_array.append(sum(dsp_test.calc_powers(f)))
xq_power_array.append(sum(dsp_test.calc_powers(q)))
xe_power_array.append(sum(dsp_test.calc_powers(e)))
# Calculate FFT output sample powers
Xfff_power_array = []
Xqff_power_array = []
Xqqq_power_array = []
Xeff_power_array = []
Xeee_power_array = []
for h in range(c_nof_input_channels):
Xfff_power = []
Xqff_power = []
Xqqq_power = []
Xeff_power = []
Xeee_power = []
for j in range(c_nof_integrations):
fff = Xfff_complex_array[h][j]
qff = Xqff_complex_array[h][j]
qqq = Xqqq_complex_array[h][j]
eff = subtract_list(fff, qff)
eee = subtract_list(fff, qqq)
Xfff_power.append(dsp_test.calc_powers(fff))
Xqff_power.append(dsp_test.calc_powers(qff))
Xqqq_power.append(dsp_test.calc_powers(qqq))
Xeff_power.append(dsp_test.calc_powers(eff))
Xeee_power.append(dsp_test.calc_powers(eee))
Xfff_power_array.append(Xfff_power)
Xqff_power_array.append(Xqff_power)
Xqqq_power_array.append(Xqqq_power)
Xeff_power_array.append(Xeff_power)
Xeee_power_array.append(Xeee_power)
# Estimate average sample power for c_nof_integrations
Xfff_power_avg_array = []
Xqff_power_avg_array = []
Xqqq_power_avg_array = []
Xeff_power_avg_array = []
Xeee_power_avg_array = []
for h in range(c_nof_input_channels):
Xfff_power_avg = []
Xqff_power_avg = []
Xqqq_power_avg = []
Xeff_power_avg = []
Xeee_power_avg = []
for i in range(g_nof_points):
Xfff_sum = 0
Xqff_sum = 0
Xqqq_sum = 0
Xeff_sum = 0
Xeee_sum = 0
for j in range(c_nof_integrations):
Xfff_sum += Xfff_power_array[h][j][i]
Xqff_sum += Xqff_power_array[h][j][i]
Xqqq_sum += Xqqq_power_array[h][j][i]
Xeff_sum += Xeff_power_array[h][j][i]
Xeee_sum += Xeee_power_array[h][j][i]
Xfff_power_avg.append(Xfff_sum/c_nof_integrations)
Xqff_power_avg.append(Xqff_sum/c_nof_integrations)
Xqqq_power_avg.append(Xqqq_sum/c_nof_integrations)
Xeff_power_avg.append(Xeff_sum/c_nof_integrations)
Xeee_power_avg.append(Xeee_sum/c_nof_integrations)
Xfff_power_avg_array.append(Xfff_power_avg)
Xqff_power_avg_array.append(Xqff_power_avg)
Xqqq_power_avg_array.append(Xqqq_power_avg)
Xeff_power_avg_array.append(Xeff_power_avg)
Xeee_power_avg_array.append(Xeee_power_avg)
###############################################################################
#
# Method 1 : Simulated SNR = sine bin power / median noise bin power
#
###############################################################################
# Apply only for 'sine' at one input and 0 at the other
c_select_a = c_real_select=='sine' and c_imag_ampl==0;
c_select_b = c_imag_select=='sine' and c_real_ampl==0;
if c_select_a or c_select_b:
# Estimate SNR of VHDL and Python output
tc.append_log(3, '')
tc.append_log(3, '>>> Simulated SNR = sine bin power / median noise bin power:')
tc.append_log(3, '')
snr_vhdl = []
snr_py = []
for h in range(c_nof_input_channels):
if c_select_a:
Rqqq_power_avg_array = Xqqq_power_avg_array[h][::2]
Rqff_power_avg_array = Xqff_power_avg_array[h][::2]
else:
Rqqq_power_avg_array = Xqqq_power_avg_array[h][1::2]
Rqff_power_avg_array = Xqff_power_avg_array[h][1::2]
snr_vhdl.append(dsp_test.calc_snr_sine_bin_noise_bin(Rqqq_power_avg_array))
snr_py.append( dsp_test.calc_snr_sine_bin_noise_bin(Rqff_power_avg_array))
tc.append_log(5, ' Rqqq_power_avg_array : %s' % Rqqq_power_avg_array)
tc.append_log(5, ' Rqff_power_avg_array : %s' % Rqff_power_avg_array)
tc.append_log(3, ' SNR in FFT bin with VHDL channel %d = %7.3f dB' % (h, dsp_test.to_dB(snr_vhdl[h])))
tc.append_log(3, ' SNR in FFT bin with Python channel %d = %7.3f dB' % (h, dsp_test.to_dB(snr_py[h])))
tc.append_log(3, '')
tc.append_log(3, ' FFT implementation loss channel %d = %7.3f dB' % (h, dsp_test.to_dB(snr_py[h]/snr_vhdl[h])))
if (dsp_test.to_dB(snr_py[h]) - dsp_test.to_dB(snr_vhdl[h]) > c_max_impl_loss_dB):
tc.append_log(2, ' FFT implementation loss is greater then maximum allowed loss of %d dB.' % c_max_impl_loss_dB )
tc.set_result('FAILED')
# Write the VHDL results to a file for comparison with other FFT-types.
if(c_write_to_file == 1):
f = file(c_file_name, "w")
for i in range(g_nof_points):
s = " %x %x \n" % (int(Xqqq_complex_array[0][0][i].real), int(Xqqq_complex_array[0][0][i].imag))
f.write(s)
f.close()
tc.append_log(3, '')
tc.append_log(3, 'Spectra data is written to file ' + c_file_name)
###############################################################################
#
# Method 2 : Simulated SNR = float signal power / calculated error power
#
###############################################################################
# Apply only for complex FFT output, so without seperate (because the separate function is not calculated in Python)
if g_use_separate==False:
# FFT input and output sample powers
tc.append_log(3, '')
tc.append_log(3, '>>> Simulated SNR:')
tc.append_log(3, '')
for h in range(c_nof_input_channels):
SNRe = xf_power_array[h] / xe_power_array[h]
SNReff = sum(Xfff_power_avg_array[h]) / sum(Xeff_power_avg_array[h]) # Note that SNReff = SNRe
SNReee = sum(Xfff_power_avg_array[h]) / sum(Xeee_power_avg_array[h])
ILqee = SNReff / SNReee
tc.append_log(3, ' SNRe at input of FFT channel %d = %7.3f dB' % (h, dsp_test.to_dB(SNRe)))
tc.append_log(3, ' SNReff at output of FFT channel %d = %7.3f dB' % (h, dsp_test.to_dB(SNReff)))
tc.append_log(3, ' SNReee at output of FFT channel %d = %7.3f dB' % (h, dsp_test.to_dB(SNReee)))
tc.append_log(3, '')
tc.append_log(3, ' FFT implementation loss channel %d = %7.3f dB' % (h, dsp_test.to_dB(ILqee)))
###############################################################################
# Plot
if c_plot_enable == 1:
# Create variable for the axes.
x_as = []
for i in range(c_integration_length):
x_as.append(i)
xi_as = []
for i in range(0, c_integration_length, g_nof_points):
xi_as.append(i)
xi_dots = [0]*c_nof_integrations # place dots on the x-axis to mark the start of the FFT blocks and to force also have y = 0 in range of y-axis
f_as = []
for i in range(g_nof_points):
f_as.append(i*c_sample_freq_mhz/g_nof_points) # 0 : N/2-1, N/2 : N-1
# Plot the time domain signals.
for h in range(c_nof_input_channels):
pl.figure(h+1)
pl.subplot(211)
pl.title('Input time-series data (%d points)' % g_nof_points)
pl.plot(x_as, xq_real_series[h][0:c_integration_length], 'b-', xi_as, xi_dots, 'ro')
pl.xlabel('FFT xq real input (A)')
pl.ylabel('Amplitude')
pl.grid()
pl.subplot(212)
pl.plot(x_as, xq_imag_series[h][0:c_integration_length], 'b-', xi_as, xi_dots, 'ro')
pl.xlabel('FFT xq imag input (B)')
pl.ylabel('Amplitude')
pl.grid()
# Plot the frequency domain signals.
for h in range(c_nof_input_channels):
pl.figure(100+h)
pl.subplot(211)
pl.title('VHDL %s FFT output bins (%d points)' % (c_fft_type, g_nof_points))
pl.plot(x_as, dsp_test.real_from_complex_list(Xqqq_complex_series[h][0:c_integration_length]), 'b-', xi_as, xi_dots, 'ro')
pl.xlabel('FFT Xqqq real output (A)')
pl.ylabel('Amplitude')
pl.grid()
pl.subplot(212)
pl.plot(x_as, dsp_test.imag_from_complex_list(Xqqq_complex_series[h][0:c_integration_length]), 'b-', xi_as, xi_dots, 'ro')
pl.xlabel('FFT Xqqq imag output (B)')
pl.ylabel('Amplitude')
pl.grid()
## Plot the VHDL spectrums
for h in range(c_nof_input_channels):
pl.figure(200+h)
pl.subplot(211)
pl.title('VHDL %s FFT averaged output spectrum (%d points)' % (c_fft_type, g_nof_points))
pl.plot(f_as, dsp_test.to_dB(Xqqq_power_avg_array[h]), 'b-', [0],[0], 'ro') # at start of FFT bins at (0,0) to force 0 dB to be in range of y-axis
pl.xlabel('VHDL Xqqq Spectrum')
pl.ylabel('Power (dB)')
pl.grid()
pl.subplot(212)
pl.plot(f_as, dsp_test.to_dB(Xqff_power_avg_array[h]), 'b-', [0],[0], 'ro') # at start of FFT bins at (0,0) to force 0 dB to be in range of y-axis
pl.xlabel('Python Xqff Spectrum')
pl.ylabel('Power (dB)')
pl.grid()
pl.show()
###############################################################################
# End
tc.set_section_id('')
tc.append_log(3, '')
tc.append_log(3, '>>>')
tc.append_log(0, '>>> Test bench result: %s' % tc.get_result())
tc.append_log(3, '>>>')
sys.exit(tc.get_result())
\ No newline at end of file
#! /usr/bin/env python
###############################################################################
#
# Copyright (C) 2012
# ASTRON (Netherlands Institute for Radio Astronomy) <http://www.astron.nl/>
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
###############################################################################
""" Functional test case for the wpfb_unit entity.
This testcase should be used to test the different types of configuration
of the wpfb unit, meaning both wide- and narrowband configurations.
Wideband configuration: g_wb_factor > 1 and g_nof_chan = 0
Narrowband configuration: g_nof_chan > 0 and g_wb_factor = 1
Both aforementioned configurations can be parallelized using
the g_nof_wb_streams generic.
The testcase applies the same input stimuli to all inputs and checks
if all generated spectrums are equal.
Usage:
> python tc_mmf_wpfb_unit_functional.py --unb 0 --bn 0 --sim
"""
###############################################################################
# System imports
import test_case
import node_io
import pi_diag_block_gen
import pi_diag_data_buffer
import pi_fil_ppf_w
import dsp_test
import sys, os
import subprocess
import pylab as pl
import numpy as np
import scipy as sp
import random
from tools import *
from common import *
# Create a test case object
tc = test_case.Testcase('TB - ', '')
# Constants/Generics that are shared between VHDL and Python
# Name Value Default Description
# START_VHDL_GENERICS
g_wb_factor = 1 # The Wideband factor (must be power of 2)
g_nof_wb_streams = 8 # The number of parallel wideband streams
g_nof_chan = 1 # The exponent (for 2) that defines the number of time-multiplexed input channels
g_nof_points = 64 # The size of the FFT
g_nof_taps = 8 # The number of taps in the filter
g_in_dat_w = 6 # Input width of the FFT
g_out_dat_w = 16 # Output width of the FFT
g_use_separate = False # When False: FFT input is considered Complex. When True: FFT input is 2xReal
g_nof_blocks = 8 # The number of FFT blocks that are simulated per BG period # (must be >= 1 and power of 2 due to that BG c_bg_ram_size must be > 1 and power of 2 to avoid unused addresses)
# END_VHDL_GENERICS
# Overwrite generics with argumented generics from autoscript or command line.
if tc.generics != None:
g_wb_factor = tc.generics['g_wb_factor']
g_nof_wb_streams = tc.generics['g_nof_wb_streams']
g_nof_chan = tc.generics['g_nof_chan']
g_nof_points = tc.generics['g_nof_points']
g_nof_taps = tc.generics['g_nof_taps']
g_in_dat_w = tc.generics['g_in_dat_w']
g_out_dat_w = tc.generics['g_out_dat_w']
g_use_separate = tc.generics['g_use_separate']
g_nof_blocks = tc.generics['g_nof_blocks']
tc.append_log(3, '>>>')
tc.append_log(1, '>>> Title : Test bench for wpfb_unit entity with %d points and %d taps' % (g_nof_points, g_nof_taps))
tc.append_log(1, '>>> This is a functional test bench that can be used to test the' )
tc.append_log(1, '>>> differesnt types of configuaration. ' )
tc.append_log(3, '>>>')
tc.append_log(3, '')
tc.set_result('PASSED')
if g_nof_chan != 0 and g_wb_factor != 1 :
g_nof_chan = 0
tc.append_log(2, 'WARNING: Forced g_nof_chan=0 because g_wb_factor is not 1. When wb_factor > 1 it is not ')
tc.append_log(2, ' possible to use time-multiplexed input channels.')
tc.append_log(3, '')
tc.append_log(3, '>>> VHDL generic settings:')
tc.append_log(3, '')
tc.append_log(3, ' g_wb_factor = %d' % g_wb_factor )
tc.append_log(3, ' g_nof_wb_streams = %d' % g_nof_wb_streams)
tc.append_log(3, ' g_nof_chan = %d' % g_nof_chan )
tc.append_log(3, ' g_nof_points = %d' % g_nof_points )
tc.append_log(3, ' g_nof_taps = %d' % g_nof_taps )
tc.append_log(3, ' g_in_dat_w = %d' % g_in_dat_w )
tc.append_log(3, ' g_out_dat_w = %d' % g_out_dat_w )
tc.append_log(3, ' g_use_separate = %s' % g_use_separate )
tc.append_log(3, ' g_nof_blocks = %d' % g_nof_blocks )
tc.append_log(3, '')
# Python waveform constants
c_real_select = 'sine' # select waveform: 'sine', 'square', 'impulse', 'noise', 'gaussian', 'zero' (default)
c_real_ampl = 1.0 # normalized full scale amplitude for sine, square, impulse and noise input
c_real_phase = 0.0 # sine phase in degrees
c_real_freq = 2 # nof periods per c_nof_points
c_real_index = 1 # time index within c_nof_points
c_real_noiselevel = 1.0 # added ADC noise level in units of 1 bit
c_real_seed = 1 # random seed for noise signal
c_imag_select = 'gaussian' # select waveform: 'sine', 'square', 'impulse', 'noise', 'gaussian', 'zero' (default)
c_imag_ampl = 1.0
c_imag_ampl = 0.0 # ... comment this line to enable gaussian
c_imag_phase = 45.0
c_imag_freq = 5
c_imag_index = 1
c_imag_noiselevel = 0.0
c_imag_seed = 2
tc.append_log(3, '')
tc.append_log(3, '>>> Waveform settings:')
tc.append_log(3, '')
tc.append_log(3, ' c_real_select = %s' % c_real_select)
tc.append_log(3, ' c_real_ampl = %7.3f' % c_real_ampl)
tc.append_log(3, ' c_real_phase = %7.3f' % c_real_phase)
tc.append_log(3, ' c_real_freq = %7.3f' % c_real_freq)
tc.append_log(3, ' c_real_index = %d' % c_real_index)
tc.append_log(3, ' c_real_noiselevel = %7.3f' % c_real_noiselevel)
tc.append_log(3, ' c_real_seed = %7.3f' % c_real_seed)
tc.append_log(3, '')
tc.append_log(3, ' c_imag_select = %s' % c_imag_select)
tc.append_log(3, ' c_imag_ampl = %7.3f' % c_imag_ampl)
tc.append_log(3, ' c_imag_phase = %7.3f' % c_imag_phase)
tc.append_log(3, ' c_imag_freq = %7.3f' % c_imag_freq)
tc.append_log(3, ' c_imag_index = %d' % c_imag_index)
tc.append_log(3, ' c_imag_noiselevel = %7.3f' % c_imag_noiselevel)
tc.append_log(3, ' c_imag_seed = %7.3f' % c_imag_seed)
tc.append_log(3, '')
# Python specific constants
c_nof_input_channels = 2**g_nof_chan
c_nof_streams = g_nof_wb_streams*c_nof_input_channels
c_nof_bg_streams = g_nof_wb_streams*g_wb_factor
c_frame_size = c_nof_input_channels*g_nof_points/g_wb_factor
c_nof_integrations = g_nof_blocks # The number of spectrums that are averaged, must be <= c_nof_blocks
c_channel_length = g_nof_blocks*g_nof_points # number of time series samples per input channel
c_bg_ram_size = g_nof_blocks*c_frame_size
c_stimuli_length = c_nof_input_channels*c_channel_length # total number of time series samples for the BG that generates all channels
c_integration_length = c_nof_integrations*g_nof_points # number of time series samples per input channel that are used for integration
c_blocks_per_sync = g_nof_taps+4 # choose sync interval little bit longer than the WPFB impulse response
c_dp_clk_period_ns = 5
c_subband_period_ns = c_frame_size * c_dp_clk_period_ns # Subband period in ns = BSN block time in ns
c_sync_interval_ns = c_subband_period_ns*c_blocks_per_sync # sync interval in ns
# Create access object for nodes
io = node_io.NodeIO(tc.nodeImages, tc.base_ip)
# Create block generator instance
bg = pi_diag_block_gen.PiDiagBlockGen(tc, io, c_nof_bg_streams, ramSizePerChannel=c_bg_ram_size)
# Create databuffer instances
db_re = pi_diag_data_buffer.PiDiagDataBuffer(tc, io, instanceName = 'REAL', nofStreams=c_nof_bg_streams, ramSizePerStream=c_stimuli_length/g_wb_factor)
db_im = pi_diag_data_buffer.PiDiagDataBuffer(tc, io, instanceName = 'IMAG', nofStreams=c_nof_bg_streams, ramSizePerStream=c_stimuli_length/g_wb_factor)
# Create dsp_test instance for helpful methods
dsp_test = dsp_test.DspTest(g_nof_points, g_in_dat_w, g_out_dat_w)
def gen_filter_hex_files(c_nof_taps = 4, c_wb_factor = 4, c_nof_points = 16):
# Create mif files for coefficient memories.
create_mifs_cmd = "python ../../../filter/src/python/create_mifs.py -t %d -w %d -b %d " % (c_nof_taps, c_wb_factor, c_nof_points)
cm = subprocess.Popen(create_mifs_cmd, cwd=r'../../../filter/src/python/', shell=True, close_fds=True)
return
if __name__ == "__main__":
###############################################################################
# Generate the hex files for simulation. If hex files do not yet
# exist it is required to start this script twice.
###############################################################################
gen_filter_hex_files(g_nof_taps, g_wb_factor, g_nof_points)
###############################################################################
#
# Write the Wideband Poly Phase Filter Bank input stimuli to the BG
#
###############################################################################
# Prepare stimuli order for block generator
xf_real = []
xf_imag = []
for i in range(c_nof_streams):
xf_real.append(dsp_test.create_waveform(sel=c_real_select, ampl=c_real_ampl, phase=c_real_phase, freq=c_real_freq, timeIndex=c_real_index, seed=c_real_seed, noiseLevel=c_real_noiselevel, length=c_channel_length))
xf_imag.append(dsp_test.create_waveform(sel=c_imag_select, ampl=c_imag_ampl, phase=c_imag_phase, freq=c_imag_freq, timeIndex=c_imag_index, seed=c_imag_seed, noiseLevel=c_imag_noiselevel, length=c_channel_length))
xq_real=[]
xq_imag=[]
for i in range(c_nof_streams):
xq_real.append(dsp_test.adc_quantize_waveform(xf_real[i]))
xq_imag.append(dsp_test.adc_quantize_waveform(xf_imag[i]))
streams = []
for i in range(c_nof_streams):
streams.append(dsp_test.concatenate_two_lists(xq_real[i], xq_imag[i], g_in_dat_w))
# Apply the alternated order due to the channels and create the wb_streams
wb_stream = []
for h in range(g_nof_wb_streams):
chnl_stream = []
for i in range(c_channel_length):
for j in range(c_nof_input_channels):
chnl_stream.append(streams[h*c_nof_input_channels + j][i])
wb_stream.append(chnl_stream)
# Use the wb_streams to create the actual streams for the block generator
# Write the stimuli to the block generator
for h in range(g_nof_wb_streams):
for i in range(g_wb_factor):
send_data = []
for j in range(c_channel_length/g_wb_factor):
for k in range(c_nof_input_channels):
send_data.append(wb_stream[h][g_wb_factor*c_nof_input_channels*j+i*c_nof_input_channels+k]) #
bg.write_waveform_ram(data=send_data, channelNr= h*g_wb_factor + i)
# Write setting for the block generator:
bg.write_block_gen_settings(samplesPerPacket=c_frame_size, blocksPerSync=c_blocks_per_sync, gapSize=0, memLowAddr=0, memHighAddr=c_bg_ram_size-1, BSNInit=0)
# enable the block generator
bg.write_enable()
# Poll the diag_data_buffer to check if the response is there.
# Retry after 3 seconds so we don't issue too many MM reads in case of simulation.
# do_until_ge(db_re.read_nof_words, ms_retry=3000, val=c_stimuli_length/g_wb_factor, s_timeout=3600)
# Run simulator to into the second sync interval to ensure a fresh second capture in diag_data_buffer with stable WPFB output (because c_sync_interval_ns > c_wpfb_response_ns)
cur_time = io.simIO.getSimTime()
print "cur_time=" + str(cur_time)
print "c_sync_interval_ns=" + str(c_sync_interval_ns)
do_until_gt(io.simIO.getSimTime, cur_time[0] + c_sync_interval_ns*5, s_timeout=3600)
###############################################################################
#
# Read FFT output from data buffer
#
###############################################################################
# Read the spectrums from the databuffer
X_real = []
X_imag = []
for i in range(c_nof_bg_streams):
X_real.append(db_re.read_data_buffer(streamNr=i, n=c_stimuli_length/g_wb_factor, radix='dec', width=g_out_dat_w))
X_imag.append(db_im.read_data_buffer(streamNr=i, n=c_stimuli_length/g_wb_factor, radix='dec', width=g_out_dat_w))
# re-arrange output data
spectrums_real = []
spectrums_imag = []
if(g_wb_factor > 1): # For wideband
for h in range(g_nof_wb_streams):
spectrum_real = []
spectrum_imag = []
for i in range(g_wb_factor):
for k in range(c_channel_length/g_wb_factor):
spectrum_real.append(X_real[h*g_wb_factor+i][0][k])
spectrum_imag.append(X_imag[h*g_wb_factor+i][0][k])
spectrums_real.append(spectrum_real)
spectrums_imag.append(spectrum_imag)
else: # For narrowband
for h in range(g_nof_wb_streams):
for j in range(c_nof_input_channels):
spectrum_real = []
spectrum_imag = []
for k in range(g_nof_blocks):
for l in range(g_nof_points):
spectrum_real.append(X_real[h][0][k*g_nof_points*c_nof_input_channels+j*g_nof_points+l])
spectrum_imag.append(X_imag[h][0][k*g_nof_points*c_nof_input_channels+j*g_nof_points+l])
spectrums_real.append(spectrum_real)
spectrums_imag.append(spectrum_imag)
# Check if all output spectrums are equal.
for i in range(g_nof_wb_streams*c_nof_input_channels):
for j in range(c_channel_length):
if spectrums_real[0][j] != spectrums_real[i][j]:
tc.append_log(2, '>>> Real part of spectrums are unequal')
tc.set_result('FAILED')
if spectrums_imag[0][j] != spectrums_imag[i][j]:
tc.append_log(2, '>>> Imag part of spectrums are unequal')
tc.set_result('FAILED')
tc.append_log(5, 'spectrums_real')
for i in range(len(spectrums_real[0])):
tc.append_log(5, 'spectrums_real = %d ' % spectrums_real[0][i] )
tc.append_log(5, 'spectrums_imag')
for i in range(len(spectrums_imag[0])):
tc.append_log(5, 'spectrums_imag = %d ' % spectrums_imag[0][i] )
tc.append_log(3, '')
tc.append_log(3, '>>>')
tc.append_log(0, '>>> Test bench result: %s' % tc.get_result())
tc.append_log(3, '>>>')
sys.exit(tc.get_result())
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment