Select Git revision
update_ConfigDb.sh
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
tc_bf_unit.py 18.00 KiB
#! /usr/bin/env python
###############################################################################
#
# Copyright (C) 2012
# ASTRON (Netherlands Institute for Radio Astronomy) <http://www.astron.nl/>
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
###############################################################################
"""Test case for the bf_unit entity.
Description:
This testcase is used in conjunction with the tb_bf_unit.vhd file.
Generics for the VHDL testbench are passed to modelsim to configure the
current simulation.
To observe the VHDL signals in the wave window modelsim needs to be
started manually and then c_start_modelsim must be set to 0 and the generics
also must be set manually.
NOTE: Use c_write_bg, c_write_bf_weights and c_write_bf_ss_wide to en- or disable
the time-consuming writes to the mm-interface. WHen disabled, be sure to
have the according .hex files in place. The .hex files can be generated
with the bf_unit.py script.
BEWARE: If the hex files change due to different script settings, then it is
necessary to run the script again to ensure that Modelsim starts with the
latest hex files.
Usage:
> python tc_bf_unit.py --unb 0 --fn 0 --sim
"""
###############################################################################
# System imports
import test_case
import node_io
import unb_apertif as apr
import pi_diag_block_gen
import pi_diag_data_buffer
import pi_ss_ss_wide
import pi_bf_bf
import dsp_test
import sys, os
import subprocess
import pylab as pl
import numpy as np
import scipy as sp
import random
from tools import *
from common import *
import mem_init_file
###############################################################################
# Debug control
c_debug_print = False
# Create a test case object
tc = test_case.Testcase('TB - ', '')
# Constants/Generics that are shared between VHDL and Python
# Name Value Default Description
# START_VHDL_GENERICS
c_nof_signal_paths = 64 # 64
c_nof_input_streams = 16 # 16
c_nof_subbands = 24 # 24
c_nof_weights = 256 # 256
c_nof_bf_units = 4 # 4
c_in_dat_w = 16 # 16
c_in_weight_w = 16 # 16
c_stat_data_w = 56 # 56
c_stat_data_sz = 2 # 2
c_bf_weights_file_name = "../../../src/hex/weights" # -- "UNUSED" or relative path to e.g. the bf/src/hex/weights hex file for adr_w=8 and dat_w=32
c_ss_wide_file_prefix = "../../../src/hex/ss_wide"
# END_VHDL_GENERICS
tc.append_log(3, '>>>')
tc.append_log(1, '>>> Title : Test bench for bf_unit' )
tc.append_log(3, '>>>')
tc.append_log(3, '')
tc.set_result('PASSED')
tc.append_log(3, '')
tc.append_log(3, '>>> VHDL generic settings:')
tc.append_log(3, '')
tc.append_log(3, ' c_nof_signal_paths = %d' % c_nof_signal_paths )
tc.append_log(3, ' c_nof_input_streams = %d' % c_nof_input_streams )
tc.append_log(3, ' c_nof_subbands = %d' % c_nof_subbands )
tc.append_log(3, ' c_nof_weights = %d' % c_nof_weights )
tc.append_log(3, ' c_nof_bf_units = %d' % c_nof_bf_units )
tc.append_log(3, ' c_in_dat_w = %d' % c_in_dat_w )
tc.append_log(3, ' c_in_weight_w = %s' % c_in_weight_w )
tc.append_log(3, ' c_stat_data_w = %d' % c_stat_data_w )
tc.append_log(3, ' c_stat_data_sz = %d' % c_stat_data_sz )
tc.append_log(3, ' c_bf_weights_file_name = %s' % c_bf_weights_file_name)
tc.append_log(3, ' c_ss_wide_file_prefix = %s' % c_ss_wide_file_prefix )
tc.append_log(3, '')
###############################################################################
# Python specific constants
c_nof_frames = 1
c_nof_signal_paths_per_stream = c_nof_signal_paths/c_nof_input_streams
c_nof_subbands_per_stream = c_nof_subbands*c_nof_signal_paths_per_stream
c_slice_size = 2**ceil_log2(c_nof_weights)
# Block Generator
c_blocks_per_sync = 7
c_bg_nof_streams = c_nof_input_streams
c_bg_ram_size = 2**ceil_log2(c_nof_subbands_per_stream)
# Weights
c_weight_w = 10 # Use smaller weight values in order to stay with the limits of the 32-bit databuffers after summing c_nof_input_streams
# Data Buffer
c_db_nof_streams = c_nof_signal_paths
c_db_ram_size = c_slice_size
c_dp_clk_freq_mhz = 200
c_block_period_ns = c_slice_size*1e3/c_dp_clk_freq_mhz # BSN block time in ns
c_sync_interval_ns = c_block_period_ns*c_blocks_per_sync # sync interval in ns
# Settings for the script in order to fasten the simulation.
c_gen_hex_files = True # Use True to create hex files for BG, BF weights and SS wide, use False when the files are in place.
# When following constants are set to False, be sure there are .hex files!!!
c_write_bg = False # When set to True the waveform will be written to the block generator over the MM bus. When False this will be skipped.
c_write_bf_weights = True # When set to True the weights will be written via the MM bus. When False this will be skipped.
c_write_bf_weights = False
c_write_bf_ss_wide = False # When set to True the setting for ss_wide will be written via the MM bus. When False this will be skipped.
# Create access object for nodes
io = node_io.NodeIO(tc.nodeImages, tc.base_ip)
# Create block generator instance
bg = pi_diag_block_gen.PiDiagBlockGen(tc, io, c_bg_nof_streams, c_bg_ram_size)
# Create databuffer instances
db_re = pi_diag_data_buffer.PiDiagDataBuffer(tc, io, instanceName = 'REAL', nofStreams=c_db_nof_streams, ramSizePerStream=c_db_ram_size)
db_im = pi_diag_data_buffer.PiDiagDataBuffer(tc, io, instanceName = 'IMAG', nofStreams=c_db_nof_streams, ramSizePerStream=c_db_ram_size)
# Create dsp_test instance for helpful methods
dsp_test_bg = dsp_test.DspTest(inDatW=c_in_dat_w)
dsp_test_weights = dsp_test.DspTest(inDatW=c_weight_w)
# Create beamformer instance
bf = pi_bf_bf.PiBfBf(tc, io, c_nof_weights, c_nof_signal_paths, c_nof_input_streams, c_stat_data_w, c_stat_data_sz)
tc.append_log(3, '')
tc.append_log(3, '>>> BF unit requantization settings:')
tc.append_log(3, '')
tc.append_log(3, ' bf.sum_of_prod_w = %d' % bf.sum_of_prod_w)
tc.append_log(3, ' bf.sign_w = %d' % bf.sign_w )
tc.append_log(3, ' bf.unit_w = %d' % bf.unit_w )
tc.append_log(3, ' bf.prod_w = %d' % bf.prod_w )
tc.append_log(3, ' bf.sum_w = %d' % bf.sum_w )
tc.append_log(3, ' bf.bst_gain_w = %d' % bf.bst_gain_w )
tc.append_log(3, ' bf.bst_dat_w = %d' % bf.bst_dat_w )
tc.append_log(3, ' bf.bst_lsb_w = %d' % bf.bst_lsb_w )
tc.append_log(3, ' bf.out_gain_w = %d' % bf.out_gain_w )
tc.append_log(3, ' bf.out_dat_w = %d' % bf.out_dat_w )
tc.append_log(3, ' bf.out_lsb_w = %d' % bf.out_lsb_w )
tc.append_log(3, '')
# Functions to generate hex-files before simulation starts
def gen_data_and_hex_files_bg(gen_hex=True, sel='noise', ampl=1.0):
multiStream = []
ampl = ampl * 1.0 # Force to float
for i in xrange(c_nof_input_streams):
singleStream_real = dsp_test_bg.create_waveform(sel, ampl, seed=2*i, noiseLevel=0, length=c_nof_subbands_per_stream)
singleStream_imag = dsp_test_bg.create_waveform(sel, ampl, seed=2*i+1, noiseLevel=0, length=c_nof_subbands_per_stream)
singleStream_real = dsp_test_bg.quantize_waveform(singleStream_real)
singleStream_imag = dsp_test_bg.quantize_waveform(singleStream_imag)
singleStream = dsp_test_bg.concatenate_two_lists(singleStream_real, singleStream_imag, c_in_dat_w)
if c_debug_print and i==0:
print "singleStream_real = %s" % singleStream_real
if (gen_hex == True):
filename = "../../src/hex/tb_bg_dat_" + str(i) + ".hex"
mem_init_file.list_to_hex(list_in=singleStream, filename=filename, mem_width=2*c_in_dat_w, mem_depth=2**(ceil_log2(c_nof_subbands_per_stream)))
multiStream.append(singleStream)
return multiStream
def gen_data_and_hex_files_bf_weights(gen_hex=True, sel='noise', ampl=1.0):
weightsBfUnit=[]
ampl = ampl * 1.0 # Force to float
for i in range(c_nof_signal_paths):
singleList_real = dsp_test_weights.create_waveform(sel, ampl, seed=2*i, noiseLevel=0, length=c_nof_weights)
singleList_imag = dsp_test_weights.create_waveform(sel, ampl, seed=2*i+1, noiseLevel=0, length=c_nof_weights)
singleList_real = dsp_test_weights.quantize_waveform(singleList_real)
singleList_imag = dsp_test_weights.quantize_waveform(singleList_imag)
if c_debug_print and i==0:
print "singleList_real = %s" % singleList_real
weightsSignalPath = dsp_test_weights.concatenate_two_lists(singleList_real, singleList_imag, c_in_weight_w)
if (gen_hex == True):
filename = "../../src/hex/weights_" + str(i) + ".hex"
mem_init_file.list_to_hex(list_in=weightsSignalPath, filename=filename, mem_width=2*c_in_weight_w, mem_depth=c_nof_weights)
weightsBfUnit.append(weightsSignalPath)
return weightsBfUnit
def gen_data_and_hex_files_bf_ss_wide(gen_hex=True):
# Apply simple SS wide scheme to select e.g. nof_beams_per_subband=4 sets of equal subbands per bf_unit.
# . The nof_beams_per_subband must be <= c_nof_subbands=24, because that is the maximum number of different subbands that is available
# . In this simple scheme the nof_beams_per_subband needs to be a divider of c_nof_weights=256, so a power of 2
nof_beams_per_subband = 4
select_buf = []
for i in range(c_nof_signal_paths_per_stream): # iterates over the number of single ss units
select_buf_line = []
for j in range(nof_beams_per_subband):
for k in range(c_nof_weights/nof_beams_per_subband):
select_buf_line.append(i*c_nof_subbands + j)
if (gen_hex == True):
filename = "../../src/hex/ss_wide_" + str(i) + ".hex"
mem_init_file.list_to_hex(list_in=select_buf_line, filename=filename, mem_width=ceil_log2(c_nof_subbands_per_stream), mem_depth=c_nof_weights)
select_buf.append(select_buf_line)
return select_buf
if __name__ == "__main__":
#do_until_gt(io.simIO.getSimTime, 100, s_timeout=3600)
###############################################################################
#
# Create stimuli for the BG
#
###############################################################################
# Prepare x stimuli for block generator
x_complex_arr = []
x_re_arr = []
x_im_arr = []
bg_data_arr = gen_data_and_hex_files_bg(c_gen_hex_files)
for i in xrange(c_nof_input_streams):
dataListComplex = bg.convert_concatenated_to_complex(bg_data_arr[i], c_in_dat_w)
x_complex_arr.append(dataListComplex)
[x_re_list, x_im_list] = split_complex(dataListComplex)
x_re_arr.append(x_re_list)
x_im_arr.append(x_im_list)
################################################################################
##
## Create settings for ss_wide in bf_unit
##
################################################################################
select_buf = gen_data_and_hex_files_bf_ss_wide(c_gen_hex_files)
###############################################################################
#
# Create weightfactors
#
###############################################################################
weight_complex_arr = []
weight_re_arr = []
weight_im_arr = []
weightsListComplex = []
generated_weights = gen_data_and_hex_files_bf_weights(c_gen_hex_files)
for j in range(c_nof_signal_paths):
weightsSignalPathComplex = bg.convert_concatenated_to_complex(generated_weights[j], c_in_weight_w)
weight_complex_arr.append(weightsSignalPathComplex)
[weight_re_list, weight_im_list] = split_complex(weightsSignalPathComplex)
weight_re_arr.append(weight_re_list)
weight_im_arr.append(weight_im_list)
if c_debug_print and j==0:
print "generated_weights = %s" % generated_weights[j]
print "weightsSignalPathComplex = %s" % weightsSignalPathComplex
print "weight_re_list = %s" % weight_re_list
###############################################################################
#
# Calculate expected results
#
###############################################################################
# Determine the data input for the multipliers.
bf_in_re = []
bf_in_im = []
for i in range(c_nof_input_streams):
for j in range(c_nof_signal_paths_per_stream):
bf_in_re.append(bf.ss_wide[i].subband_select(x_re_arr[i], select_buf[j]))
bf_in_im.append(bf.ss_wide[i].subband_select(x_im_arr[i], select_buf[j]))
# Calculate the expected raw beamlets
exp_beamlets_raw = []
for i in range(c_nof_weights):
beamlet = complex(0,0)
for j in range(c_nof_signal_paths):
beamlet = beamlet + complex(bf_in_re[j][i], bf_in_im[j][i]) * complex(weight_re_arr[j][i], weight_im_arr[j][i])
exp_beamlets_raw.append(beamlet)
if c_debug_print:
print "exp_beamlets_raw = %s" % exp_beamlets_raw
# Calculate expected beamlet statistics
bf_unit_stats_exp = bf.calculate_bf_unit_statistics(x_complex_arr, select_buf, weight_complex_arr, c_blocks_per_sync, use_16b=True)
if c_debug_print:
print "bf_unit_stats_exp = %s" % bf_unit_stats_exp
#braak
################################################################################
##
## Write the selection buffers
##
################################################################################
if c_write_bf_ss_wide:
for i in range(c_nof_input_streams):
bf.ss_wide[i].write_selects(flatten(select_buf))
################################################################################
##
## Write the weights
##
################################################################################
if c_write_bf_weights:
for i in range(c_nof_signal_paths):
bf.write_weights(generated_weights[i], i)
bf.read_weight(i, weightNr=0)
################################################################################
##
## Write data and settings to block generator
##
################################################################################
# Write setting for the block generator:
bg.write_block_gen_settings(samplesPerPacket=c_nof_subbands_per_stream, blocksPerSync=c_blocks_per_sync, gapSize=160, memLowAddr=0, memHighAddr=c_nof_subbands_per_stream-1, BSNInit=0)
# Write the stimuli to the block generator and enable the block generator
if(c_write_bg):
for i in range(c_nof_input_streams):
bg.write_waveform_ram(data=bg_data_arr[i], channelNr= i)
bg.write_enable()
# Poll the databuffer to check if the response is there.
# Retry after 3 seconds so we don't issue too many MM reads in case of simulation.
do_until_ge(db_re.read_nof_words, ms_retry=3000, val=c_nof_weights, s_timeout=3600)
###############################################################################
#
# Read and verify beamlet output from data buffer
#
###############################################################################
db_out_re = []
db_out_im = []
db_out_re.append(flatten(db_re.read_data_buffer(streamNr=0, n=c_nof_weights, radix='dec', width=32, nofColumns=8)))
db_out_im.append(flatten(db_im.read_data_buffer(streamNr=0, n=c_nof_weights, radix='dec', width=32, nofColumns=8)))
db_out_re = flatten(db_out_re)
db_out_im = flatten(db_out_im)
err = 0
for i in range(c_nof_weights):
if db_out_re[i] != exp_beamlets_raw[i].real:
tc.append_log(3, str(i) + " Read: db_out_re[i] = " + str(db_out_re[i]) + " Expected: exp_beamlets_raw[i].real = " + str(exp_beamlets_raw[i].real))
tc.set_result('FAILED')
err += 1
if db_out_im[i] != exp_beamlets_raw[i].imag:
tc.append_log(3, str(i) + " Read: db_out_im[i] = " + str(db_out_im[i]) + " Expected: exp_beamlets_raw[i].imag = " + str(exp_beamlets_raw[i].imag))
tc.set_result('FAILED')
err += 1
tc.append_log(3, "number of errors: %d" % err)
###############################################################################
#
# Read and verify BST
#
###############################################################################
# Run simulator to into the second sync interval to ensure a fresh second capture in diag_data_buffer with stable WPFB output (because c_sync_interval_ns > c_wpfb_response_ns)
do_until_gt(io.simIO.getSimTime, c_sync_interval_ns*1.5, s_timeout=3600)
bf_unit_stats_read = flatten(bf.st.read_stats(vLevel=3))
if bf_unit_stats_exp != bf_unit_stats_read:
tc.append_log(3, "Unexpected read BF statistics for blocks_per_sync = %d" % c_blocks_per_sync)
tc.append_log(3, "Expected: %s" % bf_unit_stats_exp)
tc.append_log(3, "Read: %s" % bf_unit_stats_read)
tc.set_result('FAILED')
###############################################################################
# End
tc.set_section_id('')
tc.append_log(3, '')
tc.append_log(3, '>>>')
tc.append_log(0, '>>> Test bench result: %s' % tc.get_result())
tc.append_log(3, '>>>')
sys.exit(tc.get_result())