Skip to content
Snippets Groups Projects
Commit 38555f61 authored by Pepping's avatar Pepping
Browse files

Initial commit

parent 955204be
Branches
No related tags found
No related merge requests found
#! /usr/bin/env python
###############################################################################
#
# Copyright (C) 2012
# ASTRON (Netherlands Institute for Radio Astronomy) <http://www.astron.nl/>
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
###############################################################################
"""Test case for the reorder_transpose entity.
Description:
Usage:
> python tc_reorder_transpose.py --unb 0 --fn 0 --sim
"""
###############################################################################
# System imports
import test_case
import node_io
import unb_apertif as apr
import pi_diag_block_gen
import pi_diag_data_buffer
import pi_ss_ss_wide
import pi_bf_bf
import pi_io_ddr
import dsp_test
import sys, os
import subprocess
import time
import pylab as pl
import numpy as np
import scipy as sp
import random
from tools import *
from common import *
import mem_init_file
###############################################################################
# Create a test case object
tc = test_case.Testcase('TB - ', '')
# Constants/Generics that are shared between VHDL and Python
# Name Value Default Description
# START_VHDL_GENERICS
g_nof_signal_paths = 64 # 64
g_nof_input_streams = 16 # 16
g_nof_subbands = 24 # 24
g_nof_weights = 256 # 256
g_nof_bf_units = 4 # 4
g_in_dat_w = 16 # 16
g_in_weight_w = 16 # 16
g_blocks_per_sync = 32 # 781250
g_wr_chunksize = 240
g_rd_chunksize = 16
g_rd_nof_chunks = 15
g_rd_interval = 16
g_gapsize = 0
# END_VHDL_GENERICS
# Overwrite generics with argumented generics from autoscript or command line.
#if tc.generics != None:
# g_wr_chunksize = tc.generics['g_wr_chunksize']
# g_rd_chunksize = tc.generics['g_rd_chunksize']
# g_rd_nof_chunks = tc.generics['g_rd_nof_chunks']
# g_rd_interval = tc.generics['g_rd_interval']
# g_gapsize = tc.generics['g_gapsize']
# g_nof_blocks = tc.generics['g_nof_blocks']
# g_frame_size_in = tc.generics['g_frame_size_in']
# g_frame_size_out = tc.generics['g_frame_size_out']
# Define settings for the block generator
c_bg_nof_streams = g_nof_input_streams
c_nof_sp_per_input_stream = g_nof_signal_paths / g_nof_input_streams
c_nof_subbands_per_stream = g_nof_subbands*c_nof_sp_per_input_stream
c_bg_ram_size = c_nof_subbands_per_stream
c_samples_per_packet = c_nof_sp_per_input_stream * g_nof_subbands
c_gapsize = g_nof_weights - c_samples_per_packet
c_mem_low_addr = 0
c_mem_high_addr = c_samples_per_packet-1
c_bsn_init = 42
# Define stuff for transpose
c_nof_int_streams = 1
c_ena_pre_transpose = True
c_bf_in_dat_w = 16
c_tp_in_dat_w = 8
tc.append_log(3, '>>>')
tc.append_log(1, '>>> Title : Test script for apertif_unb1_fn_beamformer_tp_bg' )
tc.append_log(3, '>>>')
tc.append_log(3, '')
tc.set_result('PASSED')
# Create access object for nodes
io = node_io.NodeIO(tc.nodeImages, tc.base_ip)
# Create block generator instance
bg = pi_diag_block_gen.PiDiagBlockGen(tc, io, g_nof_input_streams, c_bg_ram_size)
# Create instances for the beamformer units (BF)
bf=[]
for i in range(tc.nofFnNodes):
for j in xrange(g_nof_bf_units):
bf.append(pi_bf_bf.PiBfBf(tc, io, g_nof_weights, g_nof_signal_paths, g_nof_input_streams, xstEnable=True, instanceNr=j, nodeNr=tc.nodeFnNrs[i]))
# Create subandselect instance for pre-transpose.
ss = pi_ss_ss_wide.PiSsSsWide (tc, io, g_wr_chunksize*g_rd_chunksize, c_nof_int_streams)
# Create object for DDR register map
ddr = pi_io_ddr.PiIoDdr(tc, io, nof_inst = 1)
# Create dsp_test instance for helpful methods
dsp_test_bg = dsp_test.DspTest(inDatW=c_bf_in_dat_w)
# Function for generating stimuli and generating hex files.
def gen_bg_hex_files(c_framesize = 64, c_nof_frames = 32, c_nof_streams = 4):
data = []
for i in range(c_nof_streams):
stream_re = []
stream_im = []
for j in range(c_nof_frames):
for k in range(c_framesize):
stream_re.append(k)
stream_im.append(j)
data_concat = dsp_test_bg.concatenate_two_lists(stream_re, stream_im, c_tp_in_dat_w)
data.append(data_concat)
filename = "../../src/hex/tb_bg_dat_" + str(i) + ".hex"
mem_init_file.list_to_hex(list_in=data_concat, filename=filename, mem_width=c_nof_complex*c_tp_in_dat_w, mem_depth=2**(ceil_log2(c_bg_ram_size)))
return data
if __name__ == "__main__":
################################################################################
##
## Initialize the blockgenerators
##
################################################################################
# - Write settings to the block generator
tc.append_log(3, '>>>')
tc.append_log(3, '>>> Write settings to the block generator')
tc.append_log(3, '>>>')
bg.write_block_gen_settings(c_samples_per_packet, g_blocks_per_sync, c_gapsize, c_mem_low_addr, c_mem_high_addr, c_bsn_init)
# - Create a list with the input data and write it to the RAMs of the block generator
tc.append_log(3, '>>>')
tc.append_log(3, '>>> Write data to the waveform RAM of all channels')
tc.append_log(3, '>>>')
inputData = []
for i in xrange(g_nof_input_streams):
dataList = bg.generate_data_list(c_nof_sp_per_input_stream, g_nof_subbands, 2048*i*4, i, c_bf_in_dat_w)
# bg.write_waveform_ram(dataList, i)
filename = "../../src/hex/bg_in_data_" + str(i) + ".hex"
mem_init_file.list_to_hex(list_in=dataList, filename=filename, mem_width=c_nof_complex*c_bf_in_dat_w, mem_depth=2**(ceil_log2(c_bg_ram_size)))
dataListComplex = bg.convert_concatenated_to_complex(dataList, c_bf_in_dat_w)
inputData.append(dataListComplex)
################################################################################
##
## Create and Write the weight factors
##
################################################################################
tc.append_log(3, '>>>')
tc.append_log(3, '>>> Create and write weightfactors for all signal paths on all bf_units ')
tc.append_log(3, '>>>')
weightsNodes = []
for k in xrange(tc.nofFnNodes):
weightsBf = []
for i in range(g_nof_bf_units):
weightsBfUnit=[]
for j in range(g_nof_signal_paths):
weightsSignalPath = bf[k*g_nof_bf_units+i].generate_weights(g_nof_weights, i+j, i, g_in_weight_w)
filename = "../../src/hex/bf_weights_" + str(i) + "_" + str(j) + ".hex"
mem_init_file.list_to_hex(list_in=weightsSignalPath, filename=filename, mem_width=c_nof_complex*g_in_weight_w, mem_depth=g_nof_weights)
# bf[k*g_nof_bf_units+i].write_weights(weightsSignalPath, j)
weightsSignalPathComplex = bg.convert_concatenated_to_complex(weightsSignalPath, g_in_weight_w)
weightsBfUnit.append(weightsSignalPathComplex)
weightsBf.append(weightsBfUnit)
weightsNodes.append(weightsBf)
################################################################################
##
## Create and Write the selection buffers
##
################################################################################
select_buf = []
for b in xrange(g_nof_bf_units):
for i in range(c_nof_sp_per_input_stream):
select_buf_line = []
for j in range(4):
for k in range(g_nof_weights/4):
select_buf_line.append(i*g_nof_subbands + j)
select_buf.append(select_buf_line)
filename = "../../src/hex/bf_ss_wide_" + str(b) + "_" + str(i) + ".hex"
mem_init_file.list_to_hex(list_in=flatten(select_buf_line), filename=filename, mem_width=ceil_log2(c_nof_subbands_per_stream), mem_depth=g_nof_weights)
print len(flatten(select_buf))
# for i in range(tc.nofFnNodes):
# for j in xrange(g_nof_bf_units):
# for k in range(g_nof_input_streams):
## bf[i*g_nof_bf_units + j].ss_wide[k].write_selects(flatten(select_buf));
# - Enable the block generator
tc.append_log(3, '>>>')
tc.append_log(3, '>>> Enable the block generator')
tc.append_log(3, '>>>')
tc.append_log(3, '')
bg.write_enable()
###############################################################################
#
# Create setting for the pre-transpose (subbandselect)
#
###############################################################################
ss_list = []
for i in range(g_wr_chunksize):
for j in range(g_rd_chunksize):
ss_list.append(i + j*g_nof_weights)
if c_ena_pre_transpose:
ss.write_selects(ss_list)
###############################################################################
#
# Create stimuli for the BG
#
###############################################################################
# Prepare x stimuli for block generator
# bg_data = gen_bg_hex_files(g_frame_size_in, g_nof_blocks, c_bg_nof_streams)
################################################################################
##
## Write data and settings to block generator
##
################################################################################
# Write setting for the block generator:
# bg.write_block_gen_settings(samplesPerPacket=g_frame_size_in, blocksPerSync=g_nof_blocks, gapSize=c_gap_size, memLowAddr=0, memHighAddr=c_bg_ram_size-1, BSNInit=10)
# Write the stimuli to the block generator and enable the block generator
# for i in range(c_bg_nof_streams):
# bg.write_waveform_ram(data=bg_data[i], channelNr= i)
# Concatenate all channels
# t=2
# while len(bg_data) > 1:
# concat_data = []
# for i in range(len(bg_data)/2):
# concat_data.append(dsp_test_bg.concatenate_two_lists(bg_data[2*i], bg_data[2*i+1], c_in_dat_w*t))
# bg_data = concat_data
# t=t*2
#
# bg_data = flatten(bg_data)
# Wait until the DDR3 model is initialized.
if tc.sim == True:
do_until_eq(ddr.read_init_done, ms_retry=1000, val=1, s_timeout=13600) # 110000
# Enable the blockgenerator
bg.write_enable()
if(c_force_late_sync == 1):
do_until_gt(io.simIO.getSimTime, ms_retry=1000, val=180000, s_timeout=13600) # 110000
bg.write_block_gen_settings(samplesPerPacket=g_frame_size_in, blocksPerSync=g_nof_blocks+1, gapSize=c_gap_size, memLowAddr=0, memHighAddr=c_bg_ram_size-1, BSNInit=10)
elif(c_force_early_sync == 1):
do_until_gt(io.simIO.getSimTime, ms_retry=1000, val=180000, s_timeout=13600) # 110000
bg.write_block_gen_settings(samplesPerPacket=g_frame_size_in, blocksPerSync=g_nof_blocks-1, gapSize=c_gap_size, memLowAddr=0, memHighAddr=c_bg_ram_size-1, BSNInit=10)
###############################################################################
#
# Calculate reference data
#
###############################################################################
# Subband Select pre-transpose
if c_ena_pre_transpose:
bg_data_ss =[]
for i in range(len(bg_data)/c_ss_pagesize): # len(ss_list)):
bg_data_ss.append(ss.subband_select(bg_data[i*c_ss_pagesize:(i+1)*c_ss_pagesize], ss_list))
bg_data = bg_data_ss
bg_data = flatten(bg_data)
ref_data_total = []
# Check how many data there is and how many pages will be used:
for t in range(len(bg_data)/c_pagesize):
bg_data_single_page = bg_data[t*c_pagesize:(t+1)*c_pagesize]
# Write to memory
mem_page = [0] * c_pagesize
for i in range(g_nof_blocks):
for j in range(g_wr_chunksize):
mem_page[i*c_blocksize + j] = bg_data_single_page[i*g_wr_chunksize + j]
# Read from memory
ref_data = [0] * g_nof_blocks * g_rd_nof_chunks * g_rd_chunksize
rd_block_offset = 0
rd_chunk_offset = 0
for i in range(g_nof_blocks*g_rd_nof_chunks):
rd_offset = rd_block_offset + rd_chunk_offset
for k in range(g_rd_chunksize):
ref_data[i*g_rd_chunksize + k] = mem_page[rd_offset + k]
rd_block_offset = rd_block_offset + c_rd_increment
if(rd_block_offset >= c_pagesize):
rd_chunk_offset = rd_chunk_offset + g_rd_chunksize
rd_block_offset = rd_block_offset - c_pagesize
ref_data_total.append(ref_data)
ref_data_total=flatten(ref_data_total)
# Split the data again in individual channels
ref_data_split = []
ref_data_split.append(ref_data_total)
t = c_bg_nof_streams
while len(ref_data_split) < c_bg_nof_streams:
ref_data_temp = []
for i in range(len(ref_data_split)):
[data_a, data_b] = dsp_test_bg.split_in_two_lists(ref_data_split[i], c_in_dat_w*t)
ref_data_temp.append(data_a)
ref_data_temp.append(data_b)
ref_data_split = ref_data_temp
t = t/2
# Split the data in real and imaginary
ref_data_re = []
ref_data_im = []
for i in range(c_bg_nof_streams):
[data_re, data_im] = dsp_test_bg.split_in_two_lists(ref_data_split[i], c_in_dat_w)
ref_data_re.append(data_re)
ref_data_im.append(data_im)
# Poll the databuffer to check if the response is there.
# Retry after 3 seconds so we don't issue too many MM reads in case of simulation.
do_until_ge(db_re.read_nof_words, ms_retry=3000, val=c_db_ram_size, s_timeout=3600)
###############################################################################
#
# Read transposed data from data buffer
#
###############################################################################
db_out_re = []
db_out_im = []
for i in range(c_bg_nof_streams):
db_out_re.append(flatten(db_re.read_data_buffer(streamNr=i, n=c_db_ram_size, radix='uns', width=c_in_dat_w, nofColumns=8)))
db_out_im.append(flatten(db_im.read_data_buffer(streamNr=i, n=c_db_ram_size, radix='uns', width=c_in_dat_w, nofColumns=8)))
###############################################################################
#
# Verify output data
#
###############################################################################
for i in range(c_bg_nof_streams):
for j in range(len(ref_data_re[0])):
if db_out_re[i][j] != ref_data_re[i][j]:
tc.append_log(2, 'Error in real output data. Expected data: %d Data read: %d Iteration nr: %d %d' % (ref_data_re[i][j], db_out_re[i][j], i, j))
tc.set_result('FAILED')
if db_out_im[i][j] != ref_data_im[i][j]:
tc.append_log(2, 'Error in imag output data. Expected data: %d Data read: %d Iteration nr: %d %d' % (ref_data_im[i][j], db_out_im[i][j], i, j))
tc.set_result('FAILED')
###############################################################################
# End
tc.set_section_id('')
tc.append_log(3, '')
tc.append_log(3, '>>>')
tc.append_log(0, '>>> Test bench result: %s' % tc.get_result())
tc.append_log(3, '>>>')
sys.exit(tc.get_result())
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment