Skip to content
Snippets Groups Projects
Commit 953f3fda authored by Kenneth Hiemstra's avatar Kenneth Hiemstra
Browse files

del

parent ec1d10b5
No related branches found
No related tags found
No related merge requests found
#! /usr/bin/env python
###############################################################################
#
# Copyright (C) 2013
# ASTRON (Netherlands Institute for Radio Astronomy) <http://www.astron.nl/>
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
###############################################################################
# Purpose:
# . Test dp_offload_tx and dp_offload_rx, connected via several
# interfaces - each interface having its own design revision.
# Description:
# . Usage:
# . Sim: Target only one node:
# $ python tc_unb_dp_offload.py --unb 0 --fn 0 -r 0:2 --sim
# . Synth: Target any TWO nodes when using 1GbE or 10GbE revision:
# $ python tc_unb_dp_offload.py --unb 0 --fn 0,1 -r 0:2
from common import *
import test_case
import node_io
import pi_common
import time
import sys
import os
import pi_system_info
import pi_diag_block_gen
import pi_dp_offload_tx
import pi_dp_offload_tx_hdr_dat_unb_dp_offload
import pi_dp_offload_tx_hdr_ovr_unb_dp_offload
import pi2_bsn_monitor
import pi_eth
import pi_diag_block_gen
import pi_diag_data_buffer
###############################################################################
# Setup
###############################################################################
tc = test_case.Testcase('TB - ', '')
io = node_io.NodeIO(tc.nodeImages, tc.base_ip)
NOF_STREAMS = 3
NOF_BLOCKS_PER_SYNC = 1
NOF_WORDS_PER_BLOCK_TEST_RANGE = [769, 300, 2, 2, 2] # 10GbE: (769+8header words)/1000 *200MHz *64b = 9.9456 Gbps.
NOF_BLOCKS_PER_PACKET_TEST_RANGE = sel_a_b( tc.sim, [ 1, 1, 1, 2, 3], [ 1, 1, 1, 300, 400] ) # Less blocks per packet in sim; otherwise sim takes half an hour.
tc.set_result('PASSED')
tc.append_log(3, '>>>')
tc.append_log(1, '>>> Title : Test case for design unb_dp_offload. Targets: %s' % tc.unb_nodes_string())
tc.append_log(3, '>>>')
tc.append_log(3, '')
dpotx = pi_dp_offload_tx.PiDpOffloadTx(tc, io, nof_inst=NOF_STREAMS)
dpotx_hdr_dat = pi_dp_offload_tx_hdr_dat_unb_dp_offload.PiDpOffloadTxHdrDatUnbDpOffload(tc, io, nof_inst=NOF_STREAMS)
dpotx_hdr_ovr = pi_dp_offload_tx_hdr_ovr_unb_dp_offload.PiDpOffloadTxHdrOvrUnbDpOffload(tc, io, nof_inst=NOF_STREAMS)
info = pi_system_info.PiSystemInfo(tc, io)
mon = pi2_bsn_monitor.PiBsnMonitor(tc, io, nof_inst=NOF_STREAMS)
eth = pi_eth.PiEth(tc, io)
###############################################################################
# Task
###############################################################################
# Get design name to determine revision
name = info.read_design_name()
if name[0]=='unb_dp_offload_1GbE':
# The main (loopback) and 10GbE revisions can easily handle the 200MHz*32b*(900/1000) = ~6Gbps (max) streams,
# but we need lower data rate test ranges for the 1GbE revision. Keep in mind that 3 streams are multiplexed
# onto the same 1GbE link and that the header overhead is 16 words/packet.
NOF_WORDS_PER_BLOCK_TEST_RANGE = [11, 6, 2, 1]
NOF_BLOCKS_PER_PACKET_TEST_RANGE = [ 1, 2, 7, 13]
# for each stream, enable the corresponding UDP port on the 1GbE RX side
for stream in tc.gpNumbers:
eth.write_udp_port_en(stream)
if tc.sim==False:
# In sim the (one) node is looped back to itself (eth tx -> eth rx), but on the board there's a switch so we
# need to send data from node[0] to node[1] and vice versa.
# Set dst MAC of node[0] to MAC of node[1]
dpotx_hdr_dat.write(node_nrs=tc.nodeNrs[0], inst_nrs=tc.gpNumbers, registers=[('eth_dst_mac', 0x2286080000 + tc.nodeNrs[1])], regmap=dpotx_hdr_dat.regmap)
# Set dst MAC of node[1] to MAC of node[0]
dpotx_hdr_dat.write(node_nrs=tc.nodeNrs[1], inst_nrs=tc.gpNumbers, registers=[('eth_dst_mac', 0x2286080000 + tc.nodeNrs[0])], regmap=dpotx_hdr_dat.regmap)
# Now override the eth_dst_mac field so it is no longer read from the data path but from the MM register we've just written to.
dpotx_hdr_ovr.write(inst_nrs=tc.gpNumbers, registers=[('eth_dst_mac', 1)], regmap=dpotx_hdr_ovr.regmap)
for rep in range(tc.repeat):
for i in range(len(NOF_WORDS_PER_BLOCK_TEST_RANGE)):
# Write the dp_offload_tx settings
registers = [('nof_words_per_block', NOF_WORDS_PER_BLOCK_TEST_RANGE[i]), ('nof_blocks_per_packet', NOF_BLOCKS_PER_PACKET_TEST_RANGE[i])]
dpotx.write(inst_nrs=tc.gpNumbers, registers=registers, regmap=dpotx.regmap)
# Calculate our reference block size
block_len = NOF_WORDS_PER_BLOCK_TEST_RANGE[i] * NOF_BLOCKS_PER_PACKET_TEST_RANGE[i]
print 'block_len=',block_len
# Declare our reference values
ref = { 'xon_stable' : 1,
'ready_stable' : 1,
'sync_timeout' : 0,
# 'bsn_at_sync' : 0 ,# increments every second on HW
'nof_sop' : NOF_BLOCKS_PER_SYNC,
'nof_valid' : NOF_BLOCKS_PER_SYNC*block_len,
'nof_err' : 0,
'block_len' : block_len}
# 'nof_sync' : 0 } # increments every second on HW
# Keep reading the block_len at the RX side until it matches our settings at the TX side
do_until_retry_interval = sel_a_b(tc.sim, 2000, 1)
do_until_timeout_interval = sel_a_b(tc.sim, 300, 1)
#do_until_block_len = do_until_eq( method=mon.read, val=block_len, ms_retry=do_until_retry_interval, s_timeout=do_until_timeout_interval, inst_nrs=tc.gpNumbers, registers=['block_len'], regmap=mon.regmap, flatten_result=True)
# Read BSN moitors and compare to reference
pi_common.ref_compare(tc, mon.read(inst_nrs=tc.gpNumbers, regmap=mon.regmap), ref)
nof_streams=3
blocksize=0
Bg = pi_diag_block_gen.PiDiagBlockGen(tc,io,nofChannels=nof_streams,ramSizePerChannel=blocksize)
Bg.write_disable()
settings = Bg.read_block_gen_settings()
samples_per_packet = settings[0][1]
gapsize = settings[0][3]
blocksize = pow(2, ceil_log2(samples_per_packet+gapsize))
Bg = pi_diag_block_gen.PiDiagBlockGen(tc,io,nofChannels=nof_streams, ramSizePerChannel=blocksize)
#Bg.write_block_gen_settings(samplesPerPacket=700, blocksPerSync=781250, gapSize=300, memLowAddr=0, memHighAddr=701, BSNInit=42)
Db = pi_diag_data_buffer.PiDiagDataBuffer(tc,io,nofStreams=nof_streams,ramSizePerStream=blocksize)
resetptrn = [0xffffffff]*samples_per_packet + [0]*(blocksize-samples_per_packet)
for s in tc.spNrs:
Db.overwrite_data_buffer(resetptrn,streamNr=s,vLevel=9)
Bg.write_enable()
bg_ram = []
for s in tc.spNrs:
ram = Bg.read_waveform_ram(channelNr=s,vLevel=5)
rram=[]
for r in ram: rram.append(list(r)) # () -> []
bg_ram.append(rram)
db_ram = []
for s in tc.spNrs:
db_ram.append(Db.read_data_buffer(streamNr=s,vLevel=5))
Bg.write_disable()
time.sleep(tc.time)
###############################################################################
# end
###############################################################################
tc.set_section_id('')
tc.append_log(0, '>>> Test Case result: %s' % tc.get_result())
sys.exit(tc.get_result())
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment