Skip to content
Snippets Groups Projects
Commit 6f336d49 authored by Pepping's avatar Pepping
Browse files

Initial commit

parent ff28cd37
No related branches found
No related tags found
No related merge requests found
#! /usr/bin/env python
###############################################################################
#
# Copyright (C) 2012
# ASTRON (Netherlands Institute for Radio Astronomy) <http://www.astron.nl/>
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
###############################################################################
"""Test case for the apertif_unb1_correlator_mesh_ref design.
Description:
Usage:
> python tc_apertif_unb1_correlator_mesh_ref.py --gn 0:7 --sim
"""
###############################################################################
# System imports
import test_case
import node_io
import unb_apertif as apr
import pi_diag_block_gen
import pi_diag_data_buffer
import pi_bsn_monitor
import pi_ppsh
import dsp_test
import sys, os
import subprocess
import time
import pylab as pl
import numpy as np
import scipy as sp
import random
from tools import *
from common import *
import mem_init_file
###############################################################################
# Create a test case object
tc = test_case.Testcase('TB - ', '')
# Constants/Generics that are shared between VHDL and Python
# Name Value Default Description
# START_VHDL_GENERICS
g_nof_input_streams = 8 # 16
g_blocks_per_sync = 16 #32 # 781250
# Define settings for the block generator
c_bg_nof_streams = g_nof_input_streams
c_bg_ram_size = 128
c_db_nof_streams = g_nof_input_streams
c_db_ram_size = 8
c_samples_per_packet = 128
c_gapsize = 64
c_mem_low_addr = 0
c_mem_high_addr = c_samples_per_packet-1
c_bsn_init = 42
c_in_dat_w = 8
c_nof_read_back_samples = 1
c_write_bg_data = False
c_write_bg_data_to_file = False
tc.append_log(3, '>>>')
tc.append_log(1, '>>> Title : Test script for apertif_unb1_correlator_mesh_ref' )
tc.append_log(3, '>>>')
tc.append_log(3, '')
tc.set_result('PASSED')
# Create access object for nodes
io = node_io.NodeIO(tc.nodeImages, tc.base_ip)
# Create block generator instance
bg = pi_diag_block_gen.PiDiagBlockGen(tc, io, g_nof_input_streams, c_bg_ram_size, instanceName='MESH')
# Create databuffer instances
db_mesh = pi_diag_data_buffer.PiDiagDataBuffer(tc, io, instanceName = 'MESH', nofStreams=c_db_nof_streams, ramSizePerStream=c_db_ram_size)
#db_im = pi_diag_data_buffer.PiDiagDataBuffer(tc, io, instanceName = 'IM', nofStreams=c_db_nof_streams, ramSizePerStream=c_db_ram_size)
# Create bsn-monitor instance
bsn = pi_bsn_monitor.PiBsnMonitor(tc, io, nofStreams=2)
# Create dsp_test instance for helpful methods
dsp_test_bg = dsp_test.DspTest(inDatW=c_in_dat_w)
pps = pi_ppsh.PiPpsh(tc, io, nodeNr=tc.nodeFnNrs)
if __name__ == "__main__":
################################################################################
##
## Initialize the blockgenerators
##
################################################################################
# - Write settings to the block generator
tc.append_log(3, '>>>')
tc.append_log(3, '>>> Write settings to the block generator')
tc.append_log(3, '>>>')
bg.write_block_gen_settings(c_samples_per_packet, g_blocks_per_sync, c_gapsize, c_mem_low_addr, c_mem_high_addr, c_bsn_init)
# - Create a list with the input data and write it to the RAMs of the block generator
tc.append_log(3, '>>>')
tc.append_log(3, '>>> Write data to the waveform RAM of all channels')
tc.append_log(3, '>>>')
inputData = []
dataList=[]
for h in range(tc.nofNodes):
nodeData = []
for i in xrange(0, g_nof_input_streams):
data = []
for j in xrange(0, c_samples_per_packet):
real = h & (2**c_in_dat_w-1) # Node number in real part
imag = i & (2**c_in_dat_w-1) # Streamnumber in imag part
data.append((imag << c_in_dat_w) + real)
nodeData.append(data)
dataList.append(nodeData)
inputData = []
in_h = 0
for h in tc.nodeNrs:
for i in xrange(g_nof_input_streams):
if c_write_bg_data == True:
bg.write_waveform_ram(dataList[in_h][i], i, [h])
if c_write_bg_data_to_file == True:
filename = "../../src/hex/node" + str(h) + "/bg_in_data_" + str(i) + ".hex"
mem_init_file.list_to_hex(list_in=dataList[in_h][i], filename=filename, mem_width=c_nof_complex*c_in_dat_w, mem_depth=2**(ceil_log2(c_bg_ram_size)))
dataListComplex = bg.convert_concatenated_to_complex(dataList[in_h][i], c_in_dat_w)
inputData.append(dataListComplex)
in_h = in_h+1
# - Enable the block generator
tc.append_log(3, '>>>')
tc.append_log(3, '>>> Start the block generator')
tc.append_log(3, '>>>')
tc.append_log(3, '')
bg.write_enable_pps()
# bsn.read_bsn_monitor(0)
# bsn.read_bsn_monitor(1)
do_until_ge(db_mesh.read_nof_words, ms_retry=3000, val=c_db_ram_size, s_timeout=3600)
# bsn.read_bsn_monitor(0)
# bsn.read_bsn_monitor(1)
###############################################################################
#
# Read data from data buffer
#
###############################################################################
db_out = []
for i in range(c_bg_nof_streams):
db_out.append(flatten(db_mesh.read_data_buffer(streamNr=i, n=c_nof_read_back_samples, radix='uns', width=c_nof_complex*c_in_dat_w, nofColumns=8)))
###############################################################################
#
# Verify data
#
###############################################################################
tc.append_log(0, 'The Real output ')
for i in db_out:
db_re = []
for j in i:
real = j & 0xFF
db_re.append(real)
tc.append_log(0, '%s' % str(db_re))
tc.append_log(3, '')
tc.append_log(0, 'The Imaginary output ')
for i in db_out:
db_im = []
for j in i:
imag = (j & 0xFF00) >> c_in_dat_w
db_im.append(imag)
tc.append_log(0, '%s' % str(db_im))
# tc.append_log(0, '%s' % str((i & 0xFF00) >> c_in_dat_w))
# bsn.read_bsn_monitor(0)
# bsn.read_bsn_monitor(1)
# for i in range(c_bg_nof_streams):
# for j in range(tc.nofNodes):
# for k in range(c_nof_read_back_samples):
# if db_out_re[i][j*c_nof_read_back_samples + k] != i:
# tc.append_log(1, 'Error in real part. Expected %d. Received %d', (i,db_out_re[i][j*c_nof_read_back_samples + k]))
# tc.set_result('FAILED')
# if j < 4:
# if db_out_im[i][j*c_nof_read_back_samples + k] != j*2:
# tc.append_log(1, 'Error in imag part. Expected %d. Received %d', (j*2,db_out_im[i][j*c_nof_read_back_samples + k]))
# tc.set_result('FAILED')
# else:
# if db_out_im[i][j*c_nof_read_back_samples + k] != j*2+1:
# tc.append_log(1, 'Error in imag part. Expected %d. Received %d', (j*2+1,db_out_im[i][j*c_nof_read_back_samples + k]))
# tc.set_result('FAILED')
###############################################################################
# End
tc.set_section_id('')
tc.append_log(3, '')
tc.append_log(3, '>>>')
tc.append_log(0, '>>> Test bench result: %s' % tc.get_result())
tc.append_log(3, '>>>')
sys.exit(tc.get_result())
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment