Skip to content
Snippets Groups Projects
Commit e0743e63 authored by Daniel van der Schuur's avatar Daniel van der Schuur
Browse files

- Added modified version of tc_compaan_unb1_dp_offload_2ins_dev.py.

parent 663bfd33
Branches
No related tags found
No related merge requests found
#! /usr/bin/env python
###############################################################################
#
# Copyright (dC) 2013
# ASTRON (Netherlands Institute for Radio Astronomy) <http://www.astron.nl/>
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
###############################################################################
# Purpose: Test LCU - loop back via DUT
# Description:
#
# LCU = Local Control Unit (models the PC)
# DUT = Device Under Test
#
# flow diagram:
#
# LCU = FN 0 1GbE link DUT = FN 1
# BG --> dp_offload_tx ----------------> dp_offload_rx ---\
# | DP loopback in DUT
# DB <-- dp_offload_rx <---------------- dp_offload_tx <--/
#
# Usage when starting from the beginning:
# > cd $RADIOHDL
# > rm -rf build/
# > python tools/oneclick/base/modelsim_config.py
# > python tools/oneclick/base/quartus_config.py
# > run_sopc unb1 compaan_unb1_dp_offload
# > run_modelsim unb1 &
# . lp compaan_unb1_dp_offload
# . mk all
# double click tb_compaan_unb1_dp_offload_2ins simulation icon
# . as 8
# . run 2 us
# > python tc_compaan_unb1_dp_offload_2ins.py --unb 0 --fn 0,1 --sim --i data_in.txt
# . run 100 us (or run -a)
#
# Usage when rebuilding the simulation:
# > cd $RADIOHDL
# > rm -rf build/
# > python tools/oneclick/base/modelsim_config.py
# > run_modelsim unb1 &
# ...etc
#
# Usage on hardware:
# > python tc_compaan_unb1_dp_offload_2ins.py --unb 0 --fn 0,1
from common import *
import test_case
import node_io
import pi_common
import pi_dp_offload_tx
import pi_dp_offload_tx_hdr_dat_unb_dp_offload
import pi_dp_offload_tx_hdr_ovr_unb_dp_offload
import time
import sys
import os
import pi_diag_block_gen
import pi_diag_data_buffer
import pi_eth
import dsp_test
import sys, os
import subprocess
import time
import pylab as pl
import numpy as np
import scipy as sp
import random
from tools import *
from common import *
import mem_init_file
###############################################################################
# Setup
###############################################################################
NOF_STREAMS = 1
STREAM_ID = 0
NOF_WORDS_PER_BLOCK = 10
NOF_BLOCKS_PER_PACKET = 1
c_bg_nof_streams = 1
c_bg_ram_size = 1024
c_in_dat_w = 16
c_bg_block_size = 900
c_bg_gapsize = 100
c_nof_int_streams = 1
c_ena_pre_transpose = True
c_gap_size = 0 #g_rd_chunksize
c_force_late_sync = 0
c_force_early_sync = 0
tc = test_case.Testcase('TB - ', '')
io = node_io.NodeIO(tc.nodeImages, tc.base_ip)
tc.set_result('PASSED')
tc.append_log(3, '>>>')
tc.append_log(1, '>>> Title : Test case for design unb_dp_offload. Targets: %s' % tc.unb_nodes_string())
tc.append_log(3, '>>>')
tc.append_log(3, '')
eth = pi_eth.PiEth(tc, io)
dpotx = pi_dp_offload_tx.PiDpOffloadTx(tc, io, nof_inst=NOF_STREAMS)
dpotx_hdr_dat = pi_dp_offload_tx_hdr_dat_unb_dp_offload.PiDpOffloadTxHdrDatUnbDpOffload(tc, io, nof_inst=NOF_STREAMS)
dpotx_hdr_ovr = pi_dp_offload_tx_hdr_ovr_unb_dp_offload.PiDpOffloadTxHdrOvrUnbDpOffload(tc, io, nof_inst=NOF_STREAMS)
# Create block generator instance
bg = pi_diag_block_gen.PiDiagBlockGen(tc, io, c_bg_nof_streams, c_bg_ram_size, tc.nodeFn1Nrs )
# Create block generator instance
db = pi_diag_data_buffer.PiDiagDataBuffer(tc, io, instanceName = '', nofStreams=NOF_STREAMS, ramSizePerStream=NOF_WORDS_PER_BLOCK, nodeNr = tc.nodeFn1Nrs )
# Create dsp_test instance for helpful methods
dsp_test_bg = dsp_test.DspTest(inDatW=32)
# Function for generating stimuli and generating hex files.
def gen_bg_hex_files(c_framesize = 64, c_nof_streams = 4):
data = []
data_in = ""
fh = open('data_in.txt', 'r')
for i in range(c_nof_streams):
stream_re = []
stream_im = []
for j in range(c_framesize):
data_in = fh.readline()
if data_in:
stream_re.append(int(data_in))
else:
stream_re.append(99)
stream_im.append(i)
data_concat = dsp_test_bg.concatenate_two_lists(stream_re, stream_im, c_in_dat_w)
data.append(data_concat)
# filename = "../../src/hex/tb_bg_dat_" + str(i) + ".hex"
# mem_init_file.list_to_hex(list_in=data_concat, filename=filename, mem_width=c_nof_complex*c_in_dat_w, mem_depth=2**(ceil_log2(c_bg_ram_size)))
return data
# Prepare x stimuli for block generator
bg_data = gen_bg_hex_files(c_bg_ram_size, c_bg_nof_streams)
# Write the stimuli to the block generator and enable the block generator
for i in range(c_bg_nof_streams):
bg.write_waveform_ram(data=bg_data[i], channelNr= i)
# Write setting for the block generator:
bg.write_block_gen_settings(samplesPerPacket=c_bg_block_size, blocksPerSync=16, gapSize=c_bg_gapsize, memLowAddr=0, memHighAddr=c_bg_block_size-1, BSNInit=10)
# Wait until the first 2us in simulation have passed before continue
# Argument "val" defines the number of ns to wait.
if tc.sim==True:
do_until_gt(io.simIO.getSimTime, ms_retry=1000, val=2000, s_timeout=13600)
###############################################################################
# Task
###############################################################################
# Disable the blockgenerator
bg.write_disable()
# Enable the UDP port on the 1GbE RX side for stream 0
eth.write_udp_port_en(STREAM_ID)
# Write the dp_offload_tx settings
registers = [('nof_words_per_block', NOF_WORDS_PER_BLOCK), ('nof_blocks_per_packet', NOF_BLOCKS_PER_PACKET)]
dpotx.write(inst_nrs=STREAM_ID, registers=registers, regmap=dpotx.regmap)
# From Compaan to DB: set dst MAC and IP of node[0] to MAC and IP of node[1]
dpotx_hdr_dat.write(node_nrs=tc.nodeNrs[0], inst_nrs=tc.gpNumbers, registers=[('eth_dst_mac', 0x2286080500 + 1)], regmap=dpotx_hdr_dat.regmap)
dpotx_hdr_dat.write(node_nrs=tc.nodeNrs[0], inst_nrs=tc.gpNumbers, registers=[('ip_dst_ip', 0x0A630500 + 1 +1)], regmap=dpotx_hdr_dat.regmap)
# From BG to Compaan: set dst MAC and IP of node[0] to MAC and IP of node[1]
dpotx_hdr_dat.write(node_nrs=tc.nodeNrs[1], inst_nrs=tc.gpNumbers, registers=[('eth_dst_mac', 0x2286080500 + 0)], regmap=dpotx_hdr_dat.regmap)
dpotx_hdr_dat.write(node_nrs=tc.nodeNrs[1], inst_nrs=tc.gpNumbers, registers=[('ip_dst_ip', 0x0A630500 + 0 + 1)], regmap=dpotx_hdr_dat.regmap)
# From BG to DOP17
#dpotx_hdr_dat.write(node_nrs=tc.nodeNrs[0], inst_nrs=tc.gpNumbers, registers=[('eth_dst_mac', 0x0030482D82A1)], regmap=dpotx_hdr_dat.regmap)
#dpotx_hdr_dat.write(node_nrs=tc.nodeNrs[0], inst_nrs=tc.gpNumbers, registers=[('ip_dst_addr', 0x0A6300FE)], regmap=dpotx_hdr_dat.regmap)
# Now override the eth_dst_mac field so it is no longer read from the data path but from the MM register we've just written to.
#dpotx_hdr_ovr.write(inst_nrs=tc.gpNumbers, registers=[('eth_dst_mac', 1)], regmap=dpotx_hdr_ovr.regmap)
# Enable the blockgenerator
bg.write_enable()
# Poll the databuffer to check if the response is there.
# Retry after 3 seconds so we don't issue too many MM reads in case of simulation.
do_until_ge(db.read_nof_words, ms_retry=3000, val=NOF_WORDS_PER_BLOCK, s_timeout=3600)
###############################################################################
#
# Read transposed data from data buffer
#
###############################################################################
db_out = []
for i in range(NOF_STREAMS):
db_out.append(flatten(db.read_data_buffer(streamNr=i, n=NOF_WORDS_PER_BLOCK, radix='uns', width=c_in_dat_w, nofColumns=8)))
print db_out
###############################################################################
# end
###############################################################################
tc.set_section_id('')
tc.append_log(0, '>>> Test Case result: %s' % tc.get_result())
sys.exit(tc.get_result())
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment