Skip to content
Snippets Groups Projects
Commit 6d2fad56 authored by Kenneth Hiemstra's avatar Kenneth Hiemstra
Browse files

svn cp

parent 47f3a324
No related branches found
No related tags found
No related merge requests found
#! /usr/bin/env python
###############################################################################
#
# Copyright (C) 2012
# ASTRON (Netherlands Institute for Radio Astronomy) <http://www.astron.nl/>
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
###############################################################################
"""Test case for the unb1_reorder design.
Description:
Usage:
> python tc_unb1_reorder.py --unb 0 --bn 3 --sim
"""
###############################################################################
# System imports
import test_case
import node_io
import unb_apertif as apr
import pi_diag_block_gen
import pi_diag_data_buffer
import pi_ss_ss_wide
import dsp_test
import pi_io_ddr
import pi_bsn_monitor
import sys, os
import subprocess
import time
import pylab as pl
import numpy as np
import scipy as sp
import random
from tools import *
from common import *
import mem_init_file
###############################################################################
# Create a test case object
tc = test_case.Testcase('TB - ', '')
# Constants/Generics that are shared between VHDL and Python
# Name Value Default Description
# START_VHDL_GENERICS
g_wr_chunksize = 256
g_rd_chunksize = 32
g_rd_nof_chunks = 8
g_rd_interval = 32
g_gapsize = 0
g_nof_blocks = 32
# END_VHDL_GENERICS
# Overwrite generics with argumented generics from autoscript or command line.
if tc.generics != None:
g_wr_chunksize = tc.generics['g_wr_chunksize']
g_rd_chunksize = tc.generics['g_rd_chunksize']
g_rd_nof_chunks = tc.generics['g_rd_nof_chunks']
g_rd_interval = tc.generics['g_rd_interval']
g_gapsize = tc.generics['g_gapsize']
g_nof_blocks = tc.generics['g_nof_blocks']
c_blocksize = (g_wr_chunksize + g_gapsize)
c_pagesize = c_blocksize * g_nof_blocks
c_rd_increment = g_rd_interval * c_blocksize
c_bg_nof_streams = 4
c_bg_ram_size = g_wr_chunksize * g_nof_blocks
c_in_dat_w = 8
c_db_nof_streams = c_bg_nof_streams
c_db_ram_size = c_bg_ram_size
c_frame_size = g_wr_chunksize
c_nof_int_streams = 1
c_ena_pre_transpose = True
c_gap_size = 0 #g_rd_chunksize
c_force_late_sync = 0
c_force_early_sync = 0
c_nof_bsn_streams = 4
c_write_block_gen = True
tc.append_log(3, '>>>')
tc.append_log(1, '>>> Title : Test script for reorder_transpose' )
tc.append_log(3, '>>>')
tc.append_log(3, '')
tc.set_result('PASSED')
# Create access object for nodes
io = node_io.NodeIO(tc.nodeImages, tc.base_ip)
# Create block generator instance
bg = pi_diag_block_gen.PiDiagBlockGen(tc, io, c_bg_nof_streams, c_bg_ram_size, instanceName='DDR')
# Create databuffer instances
#db_re = pi_diag_data_buffer.PiDiagDataBuffer(tc, io, instanceName = 'RE', nofStreams=c_db_nof_streams, ramSizePerStream=c_db_ram_size)
#db_im = pi_diag_data_buffer.PiDiagDataBuffer(tc, io, instanceName = 'IM', nofStreams=c_db_nof_streams, ramSizePerStream=c_db_ram_size)
db_re = pi_diag_data_buffer.PiDiagDataBuffer(tc, io, instanceName = 'DDR', nofStreams=c_db_nof_streams, ramSizePerStream=c_db_ram_size)
db_im = pi_diag_data_buffer.PiDiagDataBuffer(tc, io, instanceName = 'DDR', nofStreams=c_db_nof_streams, ramSizePerStream=c_db_ram_size)
# Create subandselect instance for pre-transpose.
ss = pi_ss_ss_wide.PiSsSsWide (tc, io, c_frame_size*g_rd_chunksize, c_nof_int_streams)
# Create object for DDR register map
ddr = pi_io_ddr.PiIoDdr(tc, io, nof_inst = 1)
# BSN monitor
bsn = pi_bsn_monitor.PiBsnMonitor(tc, io, instanceName='DDR', nofStreams=c_nof_bsn_streams)
# Create dsp_test instance for helpful methods
dsp_test_bg = dsp_test.DspTest(inDatW=c_in_dat_w)
# Function for generating stimuli and generating hex files.
def gen_bg_hex_files(c_framesize = 64, c_nof_frames = 32, c_nof_streams = 4):
data = []
for i in range(c_nof_streams):
stream_re = []
stream_im = []
for j in range(c_nof_frames):
for k in range(c_framesize):
stream_re.append(k)
stream_im.append(j)
data_concat = dsp_test_bg.concatenate_two_lists(stream_re, stream_im, c_in_dat_w)
data.append(data_concat)
filename = "../../src/hex/tb_bg_dat_" + str(i) + ".hex"
mem_init_file.list_to_hex(list_in=data_concat, filename=filename, mem_width=c_nof_complex*c_in_dat_w, mem_depth=2**(ceil_log2(c_bg_ram_size)))
return data
if __name__ == "__main__":
for i in range(c_nof_bsn_streams):
bsn.read_bsn_monitor(i)
print ddr.read_init_done()
print ddr.read_usedw_rd_fifo()
print ddr.read_wait_request_n()
print ddr.read_cal_success()
print ddr.read_cal_fail()
###############################################################################
#
# Create setting for the pre-transpose (subbandselect)
#
###############################################################################
ss_list = []
for i in range(c_frame_size):
for j in range(g_rd_chunksize):
ss_list.append(i + j*c_frame_size)
if c_ena_pre_transpose:
ss.write_selects(ss_list)
###############################################################################
#
# Create stimuli for the BG
#
###############################################################################
# Prepare x stimuli for block generator
bg_data = gen_bg_hex_files(c_frame_size, g_nof_blocks, c_bg_nof_streams)
################################################################################
##
## Write data and settings to block generator
##
################################################################################
# Write setting for the block generator:
bg.write_block_gen_settings(samplesPerPacket=c_frame_size, blocksPerSync=g_nof_blocks, gapSize=c_gap_size, memLowAddr=0, memHighAddr=c_bg_ram_size-1, BSNInit=10)
# Write the stimuli to the block generator and enable the block generator
if c_write_block_gen == True:
for i in range(c_bg_nof_streams):
bg.write_waveform_ram(data=bg_data[i], channelNr= i)
# Concatenate all channels
t=2
while len(bg_data) > 1:
concat_data = []
for i in range(len(bg_data)/2):
concat_data.append(dsp_test_bg.concatenate_two_lists(bg_data[2*i], bg_data[2*i+1], c_in_dat_w*t))
bg_data = concat_data
t=t*2
bg_data = flatten(bg_data)
# Enable the blockgenerator
bg.write_enable()
# Read back the setting for the block generator to check that the BG enable has reached the dp_clk domain.
bg.read_block_gen_settings()
# Wait until the DDR3 model is initialized.
if tc.sim == True:
do_until_eq(ddr.read_init_done, ms_retry=1000, val=1, s_timeout=13600) # 110000
if(c_force_late_sync == 1):
if tc.sim == True:
do_until_gt(io.simIO.getSimTime, ms_retry=1000, val=180000, s_timeout=13600) # 110000
bg.write_block_gen_settings(samplesPerPacket=c_frame_size, blocksPerSync=g_nof_blocks+1, gapSize=c_gap_size, memLowAddr=0, memHighAddr=c_bg_ram_size-1, BSNInit=10)
elif(c_force_early_sync == 1):
if tc.sim == True:
do_until_gt(io.simIO.getSimTime, ms_retry=1000, val=180000, s_timeout=13600) # 110000
bg.write_block_gen_settings(samplesPerPacket=c_frame_size, blocksPerSync=g_nof_blocks-1, gapSize=c_gap_size, memLowAddr=0, memHighAddr=c_bg_ram_size-1, BSNInit=10)
###############################################################################
#
# Calculate reference data
#
###############################################################################
# Subband Select pre-transpose
if c_ena_pre_transpose:
bg_data_ss =[]
for i in range(len(bg_data)/len(ss_list)):
bg_data_ss.append(ss.subband_select(bg_data[i*len(ss_list):(i+1)*len(ss_list)], ss_list))
bg_data = bg_data_ss
bg_data = flatten(bg_data)
ref_data_total = []
# Check how many data there is and how many pages will be used:
for t in range(len(bg_data)/c_pagesize):
bg_data_single_page = bg_data[t*c_pagesize:(t+1)*c_pagesize]
# Write to memory
mem_page = [0] * c_pagesize
for i in range(g_nof_blocks):
for j in range(g_wr_chunksize):
mem_page[i*c_blocksize + j] = bg_data_single_page[i*g_wr_chunksize + j]
# Read from memory
ref_data = [0] * g_nof_blocks * g_rd_nof_chunks * g_rd_chunksize
rd_block_offset = 0
rd_chunk_offset = 0
for i in range(g_nof_blocks*g_rd_nof_chunks):
rd_offset = rd_block_offset + rd_chunk_offset
for k in range(g_rd_chunksize):
ref_data[i*g_rd_chunksize + k] = mem_page[rd_offset + k]
rd_block_offset = rd_block_offset + c_rd_increment
if(rd_block_offset >= c_pagesize):
rd_chunk_offset = rd_chunk_offset + g_rd_chunksize
rd_block_offset = rd_block_offset - c_pagesize
ref_data_total.append(ref_data)
ref_data_total=flatten(ref_data_total)
# Split the data again in individual channels
ref_data_split = []
ref_data_split.append(ref_data_total)
t = c_bg_nof_streams
while len(ref_data_split) < c_bg_nof_streams:
ref_data_temp = []
for i in range(len(ref_data_split)):
[data_a, data_b] = dsp_test_bg.split_in_two_lists(ref_data_split[i], c_in_dat_w*t)
ref_data_temp.append(data_a)
ref_data_temp.append(data_b)
ref_data_split = ref_data_temp
t = t/2
# Split the data in real and imaginary
ref_data_re = []
ref_data_im = []
for i in range(c_bg_nof_streams):
[data_re, data_im] = dsp_test_bg.split_in_two_lists(ref_data_split[i], c_in_dat_w)
ref_data_re.append(data_re)
ref_data_im.append(data_im)
# Poll the databuffer to check if the response is there.
# Retry after 3 seconds so we don't issue too many MM reads in case of simulation.
do_until_ge(db_re.read_nof_words, ms_retry=3000, val=c_db_ram_size, s_timeout=3600)
###############################################################################
#
# Read fifo usage
#
###############################################################################
print ddr.read_usedw_rd_fifo()
print ddr.read_usedw_wr_fifo()
###############################################################################
#
# Read transposed data from data buffer
#
###############################################################################
db_out_re = []
db_out_im = []
for i in range(c_bg_nof_streams):
db_out_re.append(flatten(db_re.read_data_buffer(streamNr=i, n=c_db_ram_size, radix='uns', width=c_in_dat_w, nofColumns=8)))
db_out_im.append(flatten(db_im.read_data_buffer(streamNr=i, n=c_db_ram_size, radix='uns', width=c_in_dat_w, nofColumns=8)))
###############################################################################
#
# Verify output data
#
###############################################################################
for i in range(c_bg_nof_streams):
for j in range(c_db_ram_size):
if db_out_re[i][j] != ref_data_re[i][j]:
tc.append_log(2, 'Error in real output data. Expected data: %d Data read: %d Iteration nr: %d %d' % (ref_data_re[i][j], db_out_re[i][j], i, j))
tc.set_result('FAILED')
if db_out_im[i][j] != ref_data_im[i][j]:
tc.append_log(2, 'Error in imag output data. Expected data: %d Data read: %d Iteration nr: %d %d' % (ref_data_im[i][j], db_out_im[i][j], i, j))
tc.set_result('FAILED')
###############################################################################
#
# Read out BSN monitor
#
###############################################################################
for i in range(c_nof_bsn_streams):
bsn.read_bsn_monitor(i)
###############################################################################
# End
###############################################################################
tc.set_section_id('')
tc.append_log(3, '')
tc.append_log(3, '>>>')
tc.append_log(0, '>>> Test bench result: %s' % tc.get_result())
tc.append_log(3, '>>>')
sys.exit(tc.get_result())
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment